[Response Ops] Keep task document when enabling/disabling rules (#139826)

* wip

* wip

* Fixing types and adding unit tests for task manager disable

* Updating to enable/disable. Update rules client to use new fns

* Updating unit tests. Fixing enable to still schedule task if necessary

* Adding functional test for task manager migration

* Fixing query. Updating functional tests

* Setting scheduledTaskId to null on disable only if it does not match rule id

* Updating README

* Fixing tests

* Task manager runner doesn't overwrite enabled on update

* Updating migration to set enabled: false for failed and unrecognized tasks

* Fixing tests

* PR feedback

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2022-09-12 09:35:54 -04:00 committed by GitHub
parent d42e0e1166
commit 0cf0e3dd97
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 1070 additions and 266 deletions

View file

@ -384,7 +384,7 @@ export interface GetActionErrorLogByIdParams {
sort: estypes.Sort;
}
interface ScheduleRuleOptions {
interface ScheduleTaskOptions {
id: string;
consumer: string;
ruleTypeId: string;
@ -589,7 +589,7 @@ export class RulesClient {
if (data.enabled) {
let scheduledTask;
try {
scheduledTask = await this.scheduleRule({
scheduledTask = await this.scheduleTask({
id: createdAlert.id,
consumer: data.consumer,
ruleTypeId: rawRule.alertTypeId,
@ -2138,7 +2138,24 @@ export class RulesClient {
} catch (e) {
throw e;
}
const scheduledTask = await this.scheduleRule({
}
let scheduledTaskIdToCreate: string | null = null;
if (attributes.scheduledTaskId) {
// If scheduledTaskId defined in rule SO, make sure it exists
try {
await this.taskManager.get(attributes.scheduledTaskId);
} catch (err) {
scheduledTaskIdToCreate = id;
}
} else {
// If scheduledTaskId doesn't exist in rule SO, set it to rule ID
scheduledTaskIdToCreate = id;
}
if (scheduledTaskIdToCreate) {
// Schedule the task if it doesn't exist
const scheduledTask = await this.scheduleTask({
id,
consumer: attributes.consumer,
ruleTypeId: attributes.alertTypeId,
@ -2148,6 +2165,9 @@ export class RulesClient {
await this.unsecuredSavedObjectsClient.update('alert', id, {
scheduledTaskId: scheduledTask.id,
});
} else {
// Task exists so set enabled to true
await this.taskManager.bulkEnableDisable([attributes.scheduledTaskId!], true);
}
}
@ -2282,14 +2302,21 @@ export class RulesClient {
this.updateMeta({
...attributes,
enabled: false,
scheduledTaskId: null,
scheduledTaskId: attributes.scheduledTaskId === id ? attributes.scheduledTaskId : null,
updatedBy: await this.getUserName(),
updatedAt: new Date().toISOString(),
}),
{ version }
);
// If the scheduledTaskId does not match the rule id, we should
// remove the task, otherwise mark the task as disabled
if (attributes.scheduledTaskId) {
await this.taskManager.removeIfExists(attributes.scheduledTaskId);
if (attributes.scheduledTaskId !== id) {
await this.taskManager.removeIfExists(attributes.scheduledTaskId);
} else {
await this.taskManager.bulkEnableDisable([attributes.scheduledTaskId], false);
}
}
}
}
@ -2767,7 +2794,7 @@ export class RulesClient {
return this.spaceId;
}
private async scheduleRule(opts: ScheduleRuleOptions) {
private async scheduleTask(opts: ScheduleTaskOptions) {
const { id, consumer, ruleTypeId, schedule, throwOnConflict } = opts;
const taskInstance = {
id, // use the same ID for task document as the rule
@ -2784,6 +2811,7 @@ export class RulesClient {
alertInstances: {},
},
scope: ['alerting'],
enabled: true,
};
try {
return await this.taskManager.schedule(taskInstance);

View file

@ -463,6 +463,7 @@ describe('create()', () => {
expect(taskManager.schedule.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Object {
"enabled": true,
"id": "1",
"params": Object {
"alertId": "1",

View file

@ -60,7 +60,7 @@ const rulesClientParams: jest.Mocked<ConstructorOptions> = {
beforeEach(() => {
getBeforeSetup(rulesClientParams, taskManager, ruleTypeRegistry);
taskManager.get.mockResolvedValue({
id: 'task-123',
id: '1',
taskType: 'alerting:123',
scheduledAt: new Date(),
attempts: 1,
@ -81,7 +81,7 @@ setGlobalDate();
describe('disable()', () => {
let rulesClient: RulesClient;
const existingAlert = {
const existingRule = {
id: '1',
type: 'alert',
attributes: {
@ -89,7 +89,7 @@ describe('disable()', () => {
schedule: { interval: '10s' },
alertTypeId: 'myType',
enabled: true,
scheduledTaskId: 'task-123',
scheduledTaskId: '1',
actions: [
{
group: 'default',
@ -105,10 +105,10 @@ describe('disable()', () => {
version: '123',
references: [],
};
const existingDecryptedAlert = {
...existingAlert,
const existingDecryptedRule = {
...existingRule,
attributes: {
...existingAlert.attributes,
...existingRule.attributes,
apiKey: Buffer.from('123:abc').toString('base64'),
apiKeyOwner: 'elastic',
},
@ -118,12 +118,12 @@ describe('disable()', () => {
beforeEach(() => {
rulesClient = new RulesClient(rulesClientParams);
unsecuredSavedObjectsClient.get.mockResolvedValue(existingAlert);
encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValue(existingDecryptedAlert);
unsecuredSavedObjectsClient.get.mockResolvedValue(existingRule);
encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValue(existingDecryptedRule);
});
describe('authorization', () => {
test('ensures user is authorised to disable this type of alert under the consumer', async () => {
test('ensures user is authorised to disable this type of rule under the consumer', async () => {
await rulesClient.disable({ id: '1' });
expect(authorization.ensureAuthorized).toHaveBeenCalledWith({
@ -134,7 +134,7 @@ describe('disable()', () => {
});
});
test('throws when user is not authorised to disable this type of alert', async () => {
test('throws when user is not authorised to disable this type of rule', async () => {
authorization.ensureAuthorized.mockRejectedValue(
new Error(`Unauthorized to disable a "myType" alert for "myApp"`)
);
@ -191,7 +191,7 @@ describe('disable()', () => {
});
});
test('disables an alert', async () => {
test('disables an rule', async () => {
await rulesClient.disable({ id: '1' });
expect(unsecuredSavedObjectsClient.get).not.toHaveBeenCalled();
expect(encryptedSavedObjects.getDecryptedAsInternalUser).toHaveBeenCalledWith('alert', '1', {
@ -208,7 +208,7 @@ describe('disable()', () => {
meta: {
versionApiKeyLastmodified: 'v7.10.0',
},
scheduledTaskId: null,
scheduledTaskId: '1',
apiKey: 'MTIzOmFiYw==',
apiKeyOwner: 'elastic',
updatedAt: '2019-02-12T21:01:22.479Z',
@ -229,11 +229,12 @@ describe('disable()', () => {
version: '123',
}
);
expect(taskManager.removeIfExists).toHaveBeenCalledWith('task-123');
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['1'], false);
expect(taskManager.removeIfExists).not.toHaveBeenCalledWith();
});
test('disables the rule with calling event log to "recover" the alert instances from the task state', async () => {
const scheduledTaskId = 'task-123';
const scheduledTaskId = '1';
taskManager.get.mockResolvedValue({
id: scheduledTaskId,
taskType: 'alerting:123',
@ -278,7 +279,7 @@ describe('disable()', () => {
meta: {
versionApiKeyLastmodified: 'v7.10.0',
},
scheduledTaskId: null,
scheduledTaskId: '1',
apiKey: 'MTIzOmFiYw==',
apiKeyOwner: 'elastic',
updatedAt: '2019-02-12T21:01:22.479Z',
@ -299,7 +300,8 @@ describe('disable()', () => {
version: '123',
}
);
expect(taskManager.removeIfExists).toHaveBeenCalledWith('task-123');
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['1'], false);
expect(taskManager.removeIfExists).not.toHaveBeenCalledWith();
expect(eventLogger.logEvent).toHaveBeenCalledTimes(1);
expect(eventLogger.logEvent.mock.calls[0][0]).toStrictEqual({
@ -359,7 +361,7 @@ describe('disable()', () => {
meta: {
versionApiKeyLastmodified: 'v7.10.0',
},
scheduledTaskId: null,
scheduledTaskId: '1',
apiKey: 'MTIzOmFiYw==',
apiKeyOwner: 'elastic',
updatedAt: '2019-02-12T21:01:22.479Z',
@ -380,7 +382,8 @@ describe('disable()', () => {
version: '123',
}
);
expect(taskManager.removeIfExists).toHaveBeenCalledWith('task-123');
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['1'], false);
expect(taskManager.removeIfExists).not.toHaveBeenCalledWith();
expect(eventLogger.logEvent).toHaveBeenCalledTimes(0);
expect(rulesClientParams.logger.warn).toHaveBeenCalledWith(
@ -395,6 +398,97 @@ describe('disable()', () => {
expect(encryptedSavedObjects.getDecryptedAsInternalUser).toHaveBeenCalledWith('alert', '1', {
namespace: 'default',
});
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith(
'alert',
'1',
{
consumer: 'myApp',
schedule: { interval: '10s' },
alertTypeId: 'myType',
enabled: false,
scheduledTaskId: '1',
updatedAt: '2019-02-12T21:01:22.479Z',
updatedBy: 'elastic',
actions: [
{
group: 'default',
id: '1',
actionTypeId: '1',
actionRef: '1',
params: {
foo: true,
},
},
],
},
{
version: '123',
}
);
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['1'], false);
expect(taskManager.removeIfExists).not.toHaveBeenCalledWith();
});
test(`doesn't disable already disabled rules`, async () => {
encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValueOnce({
...existingDecryptedRule,
attributes: {
...existingDecryptedRule.attributes,
actions: [],
enabled: false,
},
});
await rulesClient.disable({ id: '1' });
expect(unsecuredSavedObjectsClient.update).not.toHaveBeenCalled();
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.removeIfExists).not.toHaveBeenCalledWith();
});
test('swallows error when failing to load decrypted saved object', async () => {
encryptedSavedObjects.getDecryptedAsInternalUser.mockRejectedValueOnce(new Error('Fail'));
await rulesClient.disable({ id: '1' });
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalled();
expect(taskManager.bulkEnableDisable).toHaveBeenCalled();
expect(taskManager.removeIfExists).not.toHaveBeenCalledWith();
expect(rulesClientParams.logger.error).toHaveBeenCalledWith(
'disable(): Failed to load API key of alert 1: Fail'
);
});
test('throws when unsecuredSavedObjectsClient update fails', async () => {
unsecuredSavedObjectsClient.update.mockRejectedValueOnce(new Error('Failed to update'));
await expect(rulesClient.disable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failed to update"`
);
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.removeIfExists).not.toHaveBeenCalledWith();
});
test('throws when failing to disable task', async () => {
taskManager.bulkEnableDisable.mockRejectedValueOnce(new Error('Failed to disable task'));
await expect(rulesClient.disable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failed to disable task"`
);
expect(taskManager.removeIfExists).not.toHaveBeenCalledWith();
});
test('removes task document if scheduled task id does not match rule id', async () => {
encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValue({
...existingRule,
attributes: {
...existingRule.attributes,
scheduledTaskId: 'task-123',
},
});
await rulesClient.disable({ id: '1' });
expect(unsecuredSavedObjectsClient.get).not.toHaveBeenCalled();
expect(encryptedSavedObjects.getDecryptedAsInternalUser).toHaveBeenCalledWith('alert', '1', {
namespace: 'default',
});
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith(
'alert',
'1',
@ -422,48 +516,53 @@ describe('disable()', () => {
version: '123',
}
);
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.removeIfExists).toHaveBeenCalledWith('task-123');
});
test(`doesn't disable already disabled alerts`, async () => {
encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValueOnce({
...existingDecryptedAlert,
test('throws when failing to remove existing task', async () => {
encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValue({
...existingRule,
attributes: {
...existingDecryptedAlert.attributes,
actions: [],
enabled: false,
...existingRule.attributes,
scheduledTaskId: 'task-123',
},
});
await rulesClient.disable({ id: '1' });
expect(unsecuredSavedObjectsClient.update).not.toHaveBeenCalled();
expect(taskManager.removeIfExists).not.toHaveBeenCalled();
});
test('swallows error when failing to load decrypted saved object', async () => {
encryptedSavedObjects.getDecryptedAsInternalUser.mockRejectedValueOnce(new Error('Fail'));
await rulesClient.disable({ id: '1' });
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalled();
expect(taskManager.removeIfExists).toHaveBeenCalled();
expect(rulesClientParams.logger.error).toHaveBeenCalledWith(
'disable(): Failed to load API key of alert 1: Fail'
);
});
test('throws when unsecuredSavedObjectsClient update fails', async () => {
unsecuredSavedObjectsClient.update.mockRejectedValueOnce(new Error('Failed to update'));
await expect(rulesClient.disable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failed to update"`
);
});
test('throws when failing to remove task from task manager', async () => {
taskManager.removeIfExists.mockRejectedValueOnce(new Error('Failed to remove task'));
await expect(rulesClient.disable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failed to remove task"`
);
expect(unsecuredSavedObjectsClient.get).not.toHaveBeenCalled();
expect(encryptedSavedObjects.getDecryptedAsInternalUser).toHaveBeenCalledWith('alert', '1', {
namespace: 'default',
});
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith(
'alert',
'1',
{
consumer: 'myApp',
schedule: { interval: '10s' },
alertTypeId: 'myType',
enabled: false,
scheduledTaskId: null,
updatedAt: '2019-02-12T21:01:22.479Z',
updatedBy: 'elastic',
actions: [
{
group: 'default',
id: '1',
actionTypeId: '1',
actionRef: '1',
params: {
foo: true,
},
},
],
},
{
version: '123',
}
);
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
});
});

View file

@ -63,6 +63,7 @@ describe('enable()', () => {
consumer: 'myApp',
schedule: { interval: '10s' },
alertTypeId: 'myType',
scheduledTaskId: 'task-123',
enabled: false,
apiKey: 'MTIzOmFiYw==',
apiKeyOwner: 'elastic',
@ -91,7 +92,25 @@ describe('enable()', () => {
},
};
const mockTask = {
id: 'task-123',
taskType: 'alerting:123',
scheduledAt: new Date(),
attempts: 1,
status: TaskStatus.Idle,
runAt: new Date(),
startedAt: null,
retryAt: null,
state: {},
params: {
alertId: '1',
},
ownerId: null,
enabled: false,
};
beforeEach(() => {
jest.resetAllMocks();
getBeforeSetup(rulesClientParams, taskManager, ruleTypeRegistry);
(auditLogger.log as jest.Mock).mockClear();
rulesClient = new RulesClient(rulesClientParams);
@ -100,19 +119,7 @@ describe('enable()', () => {
rulesClientParams.createAPIKey.mockResolvedValue({
apiKeysEnabled: false,
});
taskManager.schedule.mockResolvedValue({
id: '1',
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
runAt: new Date(),
state: {},
params: {},
taskType: '',
startedAt: null,
retryAt: null,
ownerId: null,
});
taskManager.get.mockResolvedValue(mockTask);
});
describe('authorization', () => {
@ -208,6 +215,7 @@ describe('enable()', () => {
updatedBy: 'elastic',
apiKey: 'MTIzOmFiYw==',
apiKeyOwner: 'elastic',
scheduledTaskId: 'task-123',
actions: [
{
group: 'default',
@ -231,27 +239,7 @@ describe('enable()', () => {
version: '123',
}
);
expect(taskManager.schedule).toHaveBeenCalledWith({
id: '1',
taskType: `alerting:myType`,
params: {
alertId: '1',
spaceId: 'default',
consumer: 'myApp',
},
schedule: {
interval: '10s',
},
state: {
alertInstances: {},
alertTypeState: {},
previousStartedAt: null,
},
scope: ['alerting'],
});
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith('alert', '1', {
scheduledTaskId: '1',
});
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['task-123'], true);
});
test('enables a rule that does not have an apiKey', async () => {
@ -283,6 +271,7 @@ describe('enable()', () => {
updatedBy: 'elastic',
apiKey: 'MTIzOmFiYw==',
apiKeyOwner: 'elastic',
scheduledTaskId: 'task-123',
actions: [
{
group: 'default',
@ -306,9 +295,10 @@ describe('enable()', () => {
version: '123',
}
);
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['task-123'], true);
});
test(`doesn't enable already enabled alerts`, async () => {
test(`doesn't update already enabled alerts but ensures task is enabled`, async () => {
encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValueOnce({
...existingRuleWithoutApiKey,
attributes: {
@ -321,7 +311,7 @@ describe('enable()', () => {
expect(rulesClientParams.getUserName).not.toHaveBeenCalled();
expect(rulesClientParams.createAPIKey).not.toHaveBeenCalled();
expect(unsecuredSavedObjectsClient.create).not.toHaveBeenCalled();
expect(taskManager.schedule).not.toHaveBeenCalled();
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['task-123'], true);
});
test('sets API key when createAPIKey returns one', async () => {
@ -345,6 +335,7 @@ describe('enable()', () => {
},
apiKey: Buffer.from('123:abc').toString('base64'),
apiKeyOwner: 'elastic',
scheduledTaskId: 'task-123',
updatedBy: 'elastic',
updatedAt: '2019-02-12T21:01:22.479Z',
actions: [
@ -370,6 +361,7 @@ describe('enable()', () => {
version: '123',
}
);
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['task-123'], true);
});
test('throws an error if API key creation throws', async () => {
@ -381,6 +373,7 @@ describe('enable()', () => {
await expect(
async () => await rulesClient.enable({ id: '1' })
).rejects.toThrowErrorMatchingInlineSnapshot(`"Error creating API key for rule: no"`);
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
});
test('falls back when failing to getDecryptedAsInternalUser', async () => {
@ -391,6 +384,7 @@ describe('enable()', () => {
expect(rulesClientParams.logger.error).toHaveBeenCalledWith(
'enable(): Failed to load API key of alert 1: Fail'
);
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['task-123'], true);
});
test('throws error when failing to load the saved object using SOC', async () => {
@ -403,10 +397,10 @@ describe('enable()', () => {
expect(rulesClientParams.getUserName).not.toHaveBeenCalled();
expect(rulesClientParams.createAPIKey).not.toHaveBeenCalled();
expect(unsecuredSavedObjectsClient.update).not.toHaveBeenCalled();
expect(taskManager.schedule).not.toHaveBeenCalled();
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
});
test('throws error when failing to update the first time', async () => {
test('throws when unsecuredSavedObjectsClient update fails', async () => {
rulesClientParams.createAPIKey.mockResolvedValueOnce({
apiKeysEnabled: true,
result: { id: '123', name: '123', api_key: 'abc' },
@ -419,100 +413,53 @@ describe('enable()', () => {
);
expect(rulesClientParams.getUserName).toHaveBeenCalled();
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(1);
expect(taskManager.schedule).not.toHaveBeenCalled();
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
});
test('throws error when failing to update the second time', async () => {
unsecuredSavedObjectsClient.update.mockReset();
unsecuredSavedObjectsClient.update.mockResolvedValueOnce({
...existingRuleWithoutApiKey,
attributes: {
...existingRuleWithoutApiKey.attributes,
enabled: true,
},
});
unsecuredSavedObjectsClient.update.mockRejectedValueOnce(
new Error('Fail to update second time')
);
await expect(rulesClient.enable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot(
`"Fail to update second time"`
);
expect(rulesClientParams.getUserName).toHaveBeenCalled();
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(2);
expect(taskManager.schedule).toHaveBeenCalled();
});
test('throws error when failing to schedule task', async () => {
taskManager.schedule.mockRejectedValueOnce(new Error('Fail to schedule'));
await expect(rulesClient.enable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot(
`"Fail to schedule"`
);
expect(rulesClientParams.getUserName).toHaveBeenCalled();
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalled();
});
test('enables a rule if conflict errors received when scheduling a task', async () => {
unsecuredSavedObjectsClient.create.mockResolvedValueOnce({
...existingRuleWithoutApiKey,
attributes: {
...existingRuleWithoutApiKey.attributes,
enabled: true,
apiKey: null,
apiKeyOwner: null,
updatedBy: 'elastic',
},
});
taskManager.schedule.mockRejectedValueOnce(
Object.assign(new Error('Conflict!'), { statusCode: 409 })
);
test('enables task when scheduledTaskId is defined and task exists', async () => {
await rulesClient.enable({ id: '1' });
expect(unsecuredSavedObjectsClient.get).not.toHaveBeenCalled();
expect(encryptedSavedObjects.getDecryptedAsInternalUser).toHaveBeenCalledWith('alert', '1', {
namespace: 'default',
});
expect(unsecuredSavedObjectsClient.create).not.toBeCalledWith('api_key_pending_invalidation');
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith(
'alert',
'1',
{
name: 'name',
schedule: { interval: '10s' },
alertTypeId: 'myType',
consumer: 'myApp',
enabled: true,
meta: {
versionApiKeyLastmodified: kibanaVersion,
},
updatedAt: '2019-02-12T21:01:22.479Z',
updatedBy: 'elastic',
apiKey: 'MTIzOmFiYw==',
apiKeyOwner: 'elastic',
actions: [
{
group: 'default',
id: '1',
actionTypeId: '1',
actionRef: '1',
params: {
foo: true,
},
},
],
executionStatus: {
status: 'pending',
lastDuration: 0,
lastExecutionDate: '2019-02-12T21:01:22.479Z',
error: null,
warning: null,
},
},
{
version: '123',
}
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalled();
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['task-123'], true);
});
test('throws error when enabling task fails', async () => {
taskManager.bulkEnableDisable.mockRejectedValueOnce(new Error('Failed to enable task'));
await expect(rulesClient.enable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failed to enable task"`
);
expect(unsecuredSavedObjectsClient.get).not.toHaveBeenCalled();
expect(encryptedSavedObjects.getDecryptedAsInternalUser).toHaveBeenCalledWith('alert', '1', {
namespace: 'default',
});
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalled();
});
test('schedules task when scheduledTaskId is defined but task with that ID does not', async () => {
taskManager.schedule.mockResolvedValueOnce({
id: '1',
taskType: 'alerting:123',
scheduledAt: new Date(),
attempts: 1,
status: TaskStatus.Idle,
runAt: new Date(),
startedAt: null,
retryAt: null,
state: {},
params: {},
ownerId: null,
});
taskManager.get.mockRejectedValueOnce(new Error('Failed to get task!'));
await rulesClient.enable({ id: '1' });
expect(unsecuredSavedObjectsClient.get).not.toHaveBeenCalled();
expect(encryptedSavedObjects.getDecryptedAsInternalUser).toHaveBeenCalledWith('alert', '1', {
namespace: 'default',
});
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(2);
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.schedule).toHaveBeenCalledWith({
id: '1',
taskType: `alerting:myType`,
@ -524,6 +471,7 @@ describe('enable()', () => {
schedule: {
interval: '10s',
},
enabled: true,
state: {
alertInstances: {},
alertTypeState: {},
@ -531,7 +479,130 @@ describe('enable()', () => {
},
scope: ['alerting'],
});
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith('alert', '1', {
expect(unsecuredSavedObjectsClient.update).toHaveBeenNthCalledWith(2, 'alert', '1', {
scheduledTaskId: '1',
});
});
test('schedules task when scheduledTaskId is not defined', async () => {
encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValueOnce({
...existingRule,
attributes: { ...existingRule.attributes, scheduledTaskId: null },
});
taskManager.schedule.mockResolvedValueOnce({
id: '1',
taskType: 'alerting:123',
scheduledAt: new Date(),
attempts: 1,
status: TaskStatus.Idle,
runAt: new Date(),
startedAt: null,
retryAt: null,
state: {},
params: {},
ownerId: null,
});
await rulesClient.enable({ id: '1' });
expect(unsecuredSavedObjectsClient.get).not.toHaveBeenCalled();
expect(encryptedSavedObjects.getDecryptedAsInternalUser).toHaveBeenCalledWith('alert', '1', {
namespace: 'default',
});
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(2);
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.schedule).toHaveBeenCalledWith({
id: '1',
taskType: `alerting:myType`,
params: {
alertId: '1',
spaceId: 'default',
consumer: 'myApp',
},
schedule: {
interval: '10s',
},
enabled: true,
state: {
alertInstances: {},
alertTypeState: {},
previousStartedAt: null,
},
scope: ['alerting'],
});
expect(unsecuredSavedObjectsClient.update).toHaveBeenNthCalledWith(2, 'alert', '1', {
scheduledTaskId: '1',
});
});
test('throws error when scheduling task fails', async () => {
encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValueOnce({
...existingRule,
attributes: { ...existingRule.attributes, scheduledTaskId: null },
});
taskManager.schedule.mockRejectedValueOnce(new Error('Fail to schedule'));
await expect(rulesClient.enable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot(
`"Fail to schedule"`
);
expect(rulesClientParams.getUserName).toHaveBeenCalled();
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.schedule).toHaveBeenCalled();
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(1);
});
test('succeeds if conflict errors received when scheduling a task', async () => {
encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValueOnce({
...existingRule,
attributes: { ...existingRule.attributes, scheduledTaskId: null },
});
taskManager.schedule.mockRejectedValueOnce(
Object.assign(new Error('Conflict!'), { statusCode: 409 })
);
await rulesClient.enable({ id: '1' });
expect(unsecuredSavedObjectsClient.get).not.toHaveBeenCalled();
expect(encryptedSavedObjects.getDecryptedAsInternalUser).toHaveBeenCalledWith('alert', '1', {
namespace: 'default',
});
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(2);
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.schedule).toHaveBeenCalled();
});
test('throws error when update after scheduling task fails', async () => {
encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValueOnce({
...existingRule,
attributes: { ...existingRule.attributes, scheduledTaskId: null },
});
taskManager.schedule.mockResolvedValueOnce({
id: '1',
taskType: 'alerting:123',
scheduledAt: new Date(),
attempts: 1,
status: TaskStatus.Idle,
runAt: new Date(),
startedAt: null,
retryAt: null,
state: {},
params: {},
ownerId: null,
});
unsecuredSavedObjectsClient.update.mockResolvedValueOnce({
...existingRule,
attributes: {
...existingRule.attributes,
enabled: true,
},
});
unsecuredSavedObjectsClient.update.mockRejectedValueOnce(
new Error('Fail to update after scheduling task')
);
await expect(rulesClient.enable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot(
`"Fail to update after scheduling task"`
);
expect(rulesClientParams.getUserName).toHaveBeenCalled();
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(2);
expect(taskManager.schedule).toHaveBeenCalled();
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(unsecuredSavedObjectsClient.update).toHaveBeenNthCalledWith(2, 'alert', '1', {
scheduledTaskId: '1',
});
});

View file

@ -141,9 +141,9 @@ async function enable(success: boolean) {
return expectConflict(success, err);
}
// a successful enable call makes 2 calls to update, so that's 3 total,
// 1 with conflict + 2 on success
expectSuccess(success, 3);
// a successful enable call makes 1 call to update, so with
// conflict, we would expect 1 on conflict, 1 on success
expectSuccess(success, 2);
}
async function disable(success: boolean) {

View file

@ -328,6 +328,9 @@ The _Start_ Plugin api allow you to use Task Manager to facilitate your Plugin's
runSoon: (taskId: string) => {
// ...
},
bulkEnableDisable: (taskIds: string[], enabled: boolean) => {
// ...
},
bulkUpdateSchedules: (taskIds: string[], schedule: IntervalSchedule) => {
// ...
},
@ -418,6 +421,33 @@ export class Plugin {
}
```
#### bulkEnableDisable
Using `bulkEnableDisable` you can instruct TaskManger to update the `enabled` status of tasks.
Example:
```js
export class Plugin {
constructor() {
}
public setup(core: CoreSetup, plugins: { taskManager }) {
}
public start(core: CoreStart, plugins: { taskManager }) {
try {
const bulkDisableResults = await taskManager.bulkEnableDisable(
['97c2c4e7-d850-11ec-bf95-895ffd19f959', 'a5ee24d1-dce2-11ec-ab8d-cf74da82133d'],
false,
);
// If no error is thrown, the bulkEnableDisable has completed successfully.
// But some updates of some tasks can be failed, due to OCC 409 conflict for example
} catch(err: Error) {
// if error is caught, means the whole method requested has failed and tasks weren't updated
}
}
}
```
#### bulkUpdateSchedules
Using `bulkUpdatesSchedules` you can instruct TaskManger to update interval of tasks that are in `idle` status
(for the tasks which have `running` status, `schedule` and `runAt` will be recalculated after task run finishes).

View file

@ -30,7 +30,7 @@ export {
throwUnrecoverableError,
isEphemeralTaskRejectedDueToCapacityError,
} from './task_running';
export type { RunNowResult, BulkUpdateSchedulesResult } from './task_scheduling';
export type { RunNowResult, BulkUpdateTaskResult } from './task_scheduling';
export { getOldestIdleActionTask } from './queries/oldest_idle_action_task';
export {
IdleTaskWithExpiredRunAt,

View file

@ -30,6 +30,7 @@ const createStartMock = () => {
supportsEphemeralTasks: jest.fn(),
bulkUpdateSchedules: jest.fn(),
bulkSchedule: jest.fn(),
bulkEnableDisable: jest.fn(),
};
return mock;
};

View file

@ -53,6 +53,7 @@ export type TaskManagerStartContract = Pick<
| 'ephemeralRunNow'
| 'ensureScheduled'
| 'bulkUpdateSchedules'
| 'bulkEnableDisable'
| 'bulkSchedule'
> &
Pick<TaskStore, 'fetch' | 'aggregate' | 'get' | 'remove'> & {
@ -251,6 +252,7 @@ export class TaskManagerPlugin
bulkSchedule: (...args) => taskScheduling.bulkSchedule(...args),
ensureScheduled: (...args) => taskScheduling.ensureScheduled(...args),
runSoon: (...args) => taskScheduling.runSoon(...args),
bulkEnableDisable: (...args) => taskScheduling.bulkEnableDisable(...args),
bulkUpdateSchedules: (...args) => taskScheduling.bulkUpdateSchedules(...args),
ephemeralRunNow: (task: EphemeralTask) => taskScheduling.ephemeralRunNow(task),
supportsEphemeralTasks: () =>

View file

@ -13,6 +13,7 @@ import {
IdleTaskWithExpiredRunAt,
RunningOrClaimingTaskWithExpiredRetryAt,
SortByRunAtAndRetryAt,
EnabledTask,
} from './mark_available_tasks_as_claimed';
import { TaskTypeDictionary } from '../task_type_dictionary';
@ -53,6 +54,8 @@ describe('mark_available_tasks_as_claimed', () => {
expect({
query: mustBeAllOf(
// Task must be enabled
EnabledTask,
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt)
@ -72,6 +75,17 @@ describe('mark_available_tasks_as_claimed', () => {
query: {
bool: {
must: [
{
bool: {
must: [
{
term: {
'task.enabled': true,
},
},
],
},
},
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
{

View file

@ -72,6 +72,18 @@ export const InactiveTasks: MustNotCondition = {
},
};
export const EnabledTask: MustCondition = {
bool: {
must: [
{
term: {
'task.enabled': true,
},
},
],
},
};
export const RunningOrClaimingTaskWithExpiredRetryAt: MustCondition = {
bool: {
must: [

View file

@ -312,6 +312,17 @@ describe('TaskClaiming', () => {
expect(query).toMatchObject({
bool: {
must: [
{
bool: {
must: [
{
term: {
'task.enabled': true,
},
},
],
},
},
{
bool: {
should: [
@ -437,6 +448,17 @@ if (doc['task.runAt'].size()!=0) {
organic: {
bool: {
must: [
{
bool: {
must: [
{
term: {
'task.enabled': true,
},
},
],
},
},
{
bool: {
should: [
@ -929,6 +951,17 @@ if (doc['task.runAt'].size()!=0) {
expect(query).toMatchObject({
bool: {
must: [
{
bool: {
must: [
{
term: {
'task.enabled': true,
},
},
],
},
},
{
bool: {
should: [

View file

@ -43,6 +43,7 @@ import {
SortByRunAtAndRetryAt,
tasksClaimedByOwner,
tasksOfType,
EnabledTask,
} from './mark_available_tasks_as_claimed';
import { TaskTypeDictionary } from '../task_type_dictionary';
import {
@ -384,6 +385,8 @@ export class TaskClaiming {
: 'taskTypesToSkip'
);
const queryForScheduledTasks = mustBeAllOf(
// Task must be enabled
EnabledTask,
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt)

View file

@ -16,6 +16,9 @@
"retryAt": {
"type": "date"
},
"enabled": {
"type": "boolean"
},
"schedule": {
"properties": {
"interval": {

View file

@ -226,6 +226,47 @@ describe('successful migrations', () => {
expect(migration820(taskInstance, migrationContext)).toEqual(taskInstance);
});
});
describe('8.5.0', () => {
test('adds enabled: true to tasks that are running, claiming, or idle', () => {
const migration850 = getMigrations()['8.5.0'];
const activeTasks = [
getMockData({
status: 'running',
}),
getMockData({
status: 'claiming',
}),
getMockData({
status: 'idle',
}),
];
activeTasks.forEach((task) => {
expect(migration850(task, migrationContext)).toEqual({
...task,
attributes: {
...task.attributes,
enabled: true,
},
});
});
});
test('does not modify tasks that are failed or unrecognized', () => {
const migration850 = getMigrations()['8.5.0'];
const inactiveTasks = [
getMockData({
status: 'failed',
}),
getMockData({
status: 'unrecognized',
}),
];
inactiveTasks.forEach((task) => {
expect(migration850(task, migrationContext)).toEqual(task);
});
});
});
});
describe('handles errors during migrations', () => {

View file

@ -42,6 +42,7 @@ export function getMigrations(): SavedObjectMigrationMap {
pipeMigrations(resetAttemptsAndStatusForTheTasksWithoutSchedule, resetUnrecognizedStatus),
'8.2.0'
),
'8.5.0': executeMigrationWithErrorHandling(pipeMigrations(addEnabledField), '8.5.0'),
};
}
@ -193,3 +194,20 @@ function resetAttemptsAndStatusForTheTasksWithoutSchedule(
return doc;
}
function addEnabledField(doc: SavedObjectUnsanitizedDoc<ConcreteTaskInstance>) {
if (
doc.attributes.status === TaskStatus.Failed ||
doc.attributes.status === TaskStatus.Unrecognized
) {
return doc;
}
return {
...doc,
attributes: {
...doc.attributes,
enabled: true,
},
};
}

View file

@ -277,6 +277,11 @@ export interface TaskInstance {
* The random uuid of the Kibana instance which claimed ownership of the task last
*/
ownerId?: string | null;
/**
* Indicates whether the task is currently enabled. Disabled tasks will not be claimed.
*/
enabled?: boolean;
}
/**
@ -371,7 +376,10 @@ export interface ConcreteTaskInstance extends TaskInstance {
/**
* A task instance that has an id and is ready for storage.
*/
export type EphemeralTask = Pick<ConcreteTaskInstance, 'taskType' | 'params' | 'state' | 'scope'>;
export type EphemeralTask = Pick<
ConcreteTaskInstance,
'taskType' | 'params' | 'state' | 'scope' | 'enabled'
>;
export type EphemeralTaskInstance = EphemeralTask &
Pick<ConcreteTaskInstance, 'id' | 'scheduledAt' | 'startedAt' | 'runAt' | 'status' | 'ownerId'>;

View file

@ -179,6 +179,7 @@ describe('TaskManagerRunner', () => {
schedule: {
interval: `${intervalMinutes}m`,
},
enabled: true,
},
definitions: {
bar: {
@ -198,6 +199,7 @@ describe('TaskManagerRunner', () => {
expect(instance.retryAt!.getTime()).toEqual(
instance.startedAt!.getTime() + intervalMinutes * 60 * 1000
);
expect(instance.enabled).not.toBeDefined();
});
test('calculates retryAt by default timout when it exceeds the schedule of a recurring task', async () => {
@ -211,6 +213,7 @@ describe('TaskManagerRunner', () => {
schedule: {
interval: `${intervalSeconds}s`,
},
enabled: true,
},
definitions: {
bar: {
@ -228,6 +231,7 @@ describe('TaskManagerRunner', () => {
const instance = store.update.mock.calls[0][0];
expect(instance.retryAt!.getTime()).toEqual(instance.startedAt!.getTime() + 5 * 60 * 1000);
expect(instance.enabled).not.toBeDefined();
});
test('calculates retryAt by timeout if it exceeds the schedule when running a recurring task', async () => {
@ -242,6 +246,7 @@ describe('TaskManagerRunner', () => {
schedule: {
interval: `${intervalSeconds}s`,
},
enabled: true,
},
definitions: {
bar: {
@ -262,6 +267,7 @@ describe('TaskManagerRunner', () => {
expect(instance.retryAt!.getTime()).toEqual(
instance.startedAt!.getTime() + timeoutMinutes * 60 * 1000
);
expect(instance.enabled).not.toBeDefined();
});
test('sets startedAt, status, attempts and retryAt when claiming a task', async () => {
@ -271,6 +277,7 @@ describe('TaskManagerRunner', () => {
const { runner, store } = await pendingStageSetup({
instance: {
id,
enabled: true,
attempts: initialAttempts,
schedule: undefined,
},
@ -296,6 +303,7 @@ describe('TaskManagerRunner', () => {
expect(instance.retryAt!.getTime()).toEqual(
minutesFromNow((initialAttempts + 1) * 5).getTime() + timeoutMinutes * 60 * 1000
);
expect(instance.enabled).not.toBeDefined();
});
test('uses getRetry (returning date) to set retryAt when defined', async () => {
@ -309,6 +317,7 @@ describe('TaskManagerRunner', () => {
id,
attempts: initialAttempts,
schedule: undefined,
enabled: true,
},
definitions: {
bar: {
@ -331,6 +340,7 @@ describe('TaskManagerRunner', () => {
expect(instance.retryAt!.getTime()).toEqual(
new Date(nextRetry.getTime() + timeoutMinutes * 60 * 1000).getTime()
);
expect(instance.enabled).not.toBeDefined();
});
test('it returns false when markTaskAsRunning fails due to VERSION_CONFLICT_STATUS', async () => {
@ -539,6 +549,7 @@ describe('TaskManagerRunner', () => {
id,
attempts: initialAttempts,
schedule: undefined,
enabled: true,
},
definitions: {
bar: {
@ -563,6 +574,7 @@ describe('TaskManagerRunner', () => {
expect(instance.retryAt!.getTime()).toEqual(
new Date(Date.now() + attemptDelay + timeoutDelay).getTime()
);
expect(instance.enabled).not.toBeDefined();
});
test('uses getRetry (returning false) to set retryAt when defined', async () => {
@ -575,6 +587,7 @@ describe('TaskManagerRunner', () => {
id,
attempts: initialAttempts,
schedule: undefined,
enabled: true,
},
definitions: {
bar: {
@ -596,6 +609,7 @@ describe('TaskManagerRunner', () => {
expect(instance.retryAt!).toBeNull();
expect(instance.status).toBe('running');
expect(instance.enabled).not.toBeDefined();
});
test('bypasses getRetry (returning false) of a recurring task to set retryAt when defined', async () => {
@ -609,6 +623,7 @@ describe('TaskManagerRunner', () => {
attempts: initialAttempts,
schedule: { interval: '1m' },
startedAt: new Date(),
enabled: true,
},
definitions: {
bar: {
@ -630,6 +645,7 @@ describe('TaskManagerRunner', () => {
const timeoutDelay = timeoutMinutes * 60 * 1000;
expect(instance.retryAt!.getTime()).toEqual(new Date(Date.now() + timeoutDelay).getTime());
expect(instance.enabled).not.toBeDefined();
});
describe('TaskEvents', () => {
@ -781,6 +797,7 @@ describe('TaskManagerRunner', () => {
attempts: initialAttempts,
params: { a: 'b' },
state: { hey: 'there' },
enabled: true,
},
definitions: {
bar: {
@ -803,6 +820,7 @@ describe('TaskManagerRunner', () => {
expect(instance.runAt.getTime()).toEqual(minutesFromNow(initialAttempts * 5).getTime());
expect(instance.params).toEqual({ a: 'b' });
expect(instance.state).toEqual({ hey: 'there' });
expect(instance.enabled).not.toBeDefined();
});
test('reschedules tasks that have an schedule', async () => {
@ -811,6 +829,7 @@ describe('TaskManagerRunner', () => {
schedule: { interval: '10m' },
status: TaskStatus.Running,
startedAt: new Date(),
enabled: true,
},
definitions: {
bar: {
@ -831,6 +850,7 @@ describe('TaskManagerRunner', () => {
expect(instance.runAt.getTime()).toBeGreaterThan(minutesFromNow(9).getTime());
expect(instance.runAt.getTime()).toBeLessThanOrEqual(minutesFromNow(10).getTime());
expect(instance.enabled).not.toBeDefined();
});
test('expiration returns time after which timeout will have elapsed from start', async () => {
@ -951,6 +971,7 @@ describe('TaskManagerRunner', () => {
schedule: { interval: '20m' },
status: TaskStatus.Running,
startedAt: new Date(),
enabled: true,
},
definitions: {
bar: {
@ -968,6 +989,7 @@ describe('TaskManagerRunner', () => {
const instance = store.update.mock.calls[0][0];
expect(instance.status).toBe('failed');
expect(instance.enabled).not.toBeDefined();
expect(onTaskEvent).toHaveBeenCalledWith(
withAnyTiming(
@ -1092,6 +1114,7 @@ describe('TaskManagerRunner', () => {
instance: {
id,
attempts: initialAttempts,
enabled: true,
},
definitions: {
bar: {
@ -1113,6 +1136,7 @@ describe('TaskManagerRunner', () => {
const instance = store.update.mock.calls[0][0];
expect(instance.runAt.getTime()).toEqual(nextRetry.getTime());
expect(instance.enabled).not.toBeDefined();
});
test('uses getRetry function (returning true) on error when defined', async () => {
@ -1124,6 +1148,7 @@ describe('TaskManagerRunner', () => {
instance: {
id,
attempts: initialAttempts,
enabled: true,
},
definitions: {
bar: {
@ -1146,6 +1171,7 @@ describe('TaskManagerRunner', () => {
const expectedRunAt = new Date(Date.now() + initialAttempts * 5 * 60 * 1000);
expect(instance.runAt.getTime()).toEqual(expectedRunAt.getTime());
expect(instance.enabled).not.toBeDefined();
});
test('uses getRetry function (returning false) on error when defined', async () => {
@ -1157,6 +1183,7 @@ describe('TaskManagerRunner', () => {
instance: {
id,
attempts: initialAttempts,
enabled: true,
},
definitions: {
bar: {
@ -1178,6 +1205,7 @@ describe('TaskManagerRunner', () => {
const instance = store.update.mock.calls[0][0];
expect(instance.status).toBe('failed');
expect(instance.enabled).not.toBeDefined();
});
test('bypasses getRetry function (returning false) on error of a recurring task', async () => {
@ -1191,6 +1219,7 @@ describe('TaskManagerRunner', () => {
attempts: initialAttempts,
schedule: { interval: '1m' },
startedAt: new Date(),
enabled: true,
},
definitions: {
bar: {
@ -1214,6 +1243,7 @@ describe('TaskManagerRunner', () => {
const nextIntervalDelay = 60000; // 1m
const expectedRunAt = new Date(Date.now() + nextIntervalDelay);
expect(instance.runAt.getTime()).toEqual(expectedRunAt.getTime());
expect(instance.enabled).not.toBeDefined();
});
test('Fails non-recurring task when maxAttempts reached', async () => {
@ -1224,6 +1254,7 @@ describe('TaskManagerRunner', () => {
id,
attempts: initialAttempts,
schedule: undefined,
enabled: true,
},
definitions: {
bar: {
@ -1246,6 +1277,7 @@ describe('TaskManagerRunner', () => {
expect(instance.status).toEqual('failed');
expect(instance.retryAt!).toBeNull();
expect(instance.runAt.getTime()).toBeLessThanOrEqual(Date.now());
expect(instance.enabled).not.toBeDefined();
});
test(`Doesn't fail recurring tasks when maxAttempts reached`, async () => {
@ -1258,6 +1290,7 @@ describe('TaskManagerRunner', () => {
attempts: initialAttempts,
schedule: { interval: `${intervalSeconds}s` },
startedAt: new Date(),
enabled: true,
},
definitions: {
bar: {
@ -1281,6 +1314,7 @@ describe('TaskManagerRunner', () => {
expect(instance.runAt.getTime()).toEqual(
new Date(Date.now() + intervalSeconds * 1000).getTime()
);
expect(instance.enabled).not.toBeDefined();
});
describe('TaskEvents', () => {
@ -1450,6 +1484,7 @@ describe('TaskManagerRunner', () => {
instance: {
id,
startedAt: new Date(),
enabled: true,
},
definitions: {
bar: {
@ -1468,6 +1503,7 @@ describe('TaskManagerRunner', () => {
const instance = store.update.mock.calls[0][0];
expect(instance.status).toBe('failed');
expect(instance.enabled).not.toBeDefined();
expect(onTaskEvent).toHaveBeenCalledWith(
withAnyTiming(

View file

@ -14,7 +14,7 @@
import apm from 'elastic-apm-node';
import uuid from 'uuid';
import { withSpan } from '@kbn/apm-utils';
import { identity, defaults, flow } from 'lodash';
import { identity, defaults, flow, omit } from 'lodash';
import { Logger, SavedObjectsErrorHelpers, ExecutionContextStart } from '@kbn/core/server';
import { UsageCounter } from '@kbn/usage-collection-plugin/server';
import { Middleware } from '../lib/middleware';
@ -375,7 +375,7 @@ export class TaskManagerRunner implements TaskRunner {
this.instance = asReadyToRun(
(await this.bufferedTaskStore.update({
...taskInstance,
...taskWithoutEnabled(taskInstance),
status: TaskStatus.Running,
startedAt: now,
attempts,
@ -456,7 +456,7 @@ export class TaskManagerRunner implements TaskRunner {
private async releaseClaimAndIncrementAttempts(): Promise<Result<ConcreteTaskInstance, Error>> {
return promiseResult(
this.bufferedTaskStore.update({
...this.instance.task,
...taskWithoutEnabled(this.instance.task),
status: TaskStatus.Idle,
attempts: this.instance.task.attempts + 1,
startedAt: null,
@ -549,7 +549,7 @@ export class TaskManagerRunner implements TaskRunner {
retryAt: null,
ownerId: null,
},
this.instance.task
taskWithoutEnabled(this.instance.task)
)
)
);
@ -677,6 +677,12 @@ function howManyMsUntilOwnershipClaimExpires(ownershipClaimedUntil: Date | null)
return ownershipClaimedUntil ? ownershipClaimedUntil.getTime() - Date.now() : 0;
}
// Omits "enabled" field from task updates so we don't overwrite any user
// initiated changes to "enabled" while the task was running
function taskWithoutEnabled(task: ConcreteTaskInstance): ConcreteTaskInstance {
return omit(task, 'enabled');
}
// A type that extracts the Instance type out of TaskRunningStage
// This helps us to better communicate to the developer what the expected "stage"
// in a specific place in the code might be

View file

@ -9,6 +9,7 @@ import { TaskScheduling } from './task_scheduling';
const createTaskSchedulingMock = () => {
return {
bulkEnableDisable: jest.fn(),
ensureScheduled: jest.fn(),
schedule: jest.fn(),
runSoon: jest.fn(),

View file

@ -70,6 +70,26 @@ describe('TaskScheduling', () => {
id: undefined,
schedule: undefined,
traceparent: 'parent',
enabled: true,
});
});
test('allows scheduling tasks that are disabled', async () => {
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const task = {
taskType: 'foo',
enabled: false,
params: {},
state: {},
};
await taskScheduling.schedule(task);
expect(mockTaskStore.schedule).toHaveBeenCalled();
expect(mockTaskStore.schedule).toHaveBeenCalledWith({
...task,
id: undefined,
schedule: undefined,
traceparent: 'parent',
enabled: false,
});
});
@ -125,6 +145,133 @@ describe('TaskScheduling', () => {
});
});
describe('bulkEnableDisable', () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
beforeEach(() => {
mockTaskStore.bulkUpdate.mockImplementation(() =>
Promise.resolve([{ tag: 'ok', value: mockTask() }])
);
});
test('should search for tasks by ids enabled = true when disabling', async () => {
mockTaskStore.fetch.mockResolvedValue({ docs: [] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkEnableDisable([id], false);
expect(mockTaskStore.fetch).toHaveBeenCalledTimes(1);
expect(mockTaskStore.fetch).toHaveBeenCalledWith({
query: {
bool: {
must: [
{
terms: {
_id: [`task:${id}`],
},
},
{
term: {
'task.enabled': true,
},
},
],
},
},
size: 100,
});
});
test('should search for tasks by ids enabled = false when enabling', async () => {
mockTaskStore.fetch.mockResolvedValue({ docs: [] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkEnableDisable([id], true);
expect(mockTaskStore.fetch).toHaveBeenCalledTimes(1);
expect(mockTaskStore.fetch).toHaveBeenCalledWith({
query: {
bool: {
must: [
{
terms: {
_id: [`task:${id}`],
},
},
{
term: {
'task.enabled': false,
},
},
],
},
},
size: 100,
});
});
test('should split search on chunks when input ids array too large', async () => {
mockTaskStore.fetch.mockResolvedValue({ docs: [] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkEnableDisable(Array.from({ length: 1250 }), false);
expect(mockTaskStore.fetch).toHaveBeenCalledTimes(13);
});
test('should transform response into correct format', async () => {
const successfulTask = mockTask({
id: 'task-1',
enabled: false,
schedule: { interval: '1h' },
});
const failedTask = mockTask({ id: 'task-2', enabled: true, schedule: { interval: '1h' } });
mockTaskStore.bulkUpdate.mockImplementation(() =>
Promise.resolve([
{ tag: 'ok', value: successfulTask },
{ tag: 'err', error: { entity: failedTask, error: new Error('fail') } },
])
);
mockTaskStore.fetch.mockResolvedValue({ docs: [successfulTask, failedTask] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const result = await taskScheduling.bulkEnableDisable(
[successfulTask.id, failedTask.id],
false
);
expect(result).toEqual({
tasks: [successfulTask],
errors: [{ task: failedTask, error: new Error('fail') }],
});
});
test('should not disable task if it is already disabled', async () => {
const task = mockTask({ id, enabled: false, schedule: { interval: '3h' } });
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkEnableDisable([id], false);
const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0];
expect(bulkUpdatePayload).toHaveLength(0);
});
test('should not enable task if it is already enabled', async () => {
const task = mockTask({ id, enabled: true, schedule: { interval: '3h' } });
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkEnableDisable([id], true);
const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0];
expect(bulkUpdatePayload).toHaveLength(0);
});
});
describe('bulkUpdateSchedules', () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
beforeEach(() => {
@ -258,6 +405,7 @@ describe('TaskScheduling', () => {
expect(bulkUpdatePayload[0].runAt.getTime()).toBeLessThanOrEqual(Date.now());
});
});
describe('runSoon', () => {
test('resolves when the task update succeeds', async () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
@ -513,6 +661,40 @@ describe('TaskScheduling', () => {
id: undefined,
schedule: undefined,
traceparent: 'parent',
enabled: true,
},
]);
});
test('allows scheduling tasks that are disabled', async () => {
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const task1 = {
taskType: 'foo',
params: {},
state: {},
};
const task2 = {
taskType: 'foo',
params: {},
state: {},
enabled: false,
};
await taskScheduling.bulkSchedule([task1, task2]);
expect(mockTaskStore.bulkSchedule).toHaveBeenCalled();
expect(mockTaskStore.bulkSchedule).toHaveBeenCalledWith([
{
...task1,
id: undefined,
schedule: undefined,
traceparent: 'parent',
enabled: true,
},
{
...task2,
id: undefined,
schedule: undefined,
traceparent: 'parent',
enabled: false,
},
]);
});
@ -546,6 +728,7 @@ function mockTask(overrides: Partial<ConcreteTaskInstance> = {}): ConcreteTaskIn
taskType: 'foo',
schedule: undefined,
attempts: 0,
enabled: true,
status: TaskStatus.Claiming,
params: { hello: 'world' },
state: { baby: 'Henhen' },

View file

@ -48,7 +48,7 @@ import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle';
import { EphemeralTaskRejectedDueToCapacityError } from './task_running';
const VERSION_CONFLICT_STATUS = 409;
const BULK_ACTION_SIZE = 100;
export interface TaskSchedulingOpts {
logger: Logger;
taskStore: TaskStore;
@ -61,7 +61,7 @@ export interface TaskSchedulingOpts {
/**
* return type of TaskScheduling.bulkUpdateSchedules method
*/
export interface BulkUpdateSchedulesResult {
export interface BulkUpdateTaskResult {
/**
* list of successfully updated tasks
*/
@ -126,6 +126,7 @@ export class TaskScheduling {
return await this.store.schedule({
...modifiedTask,
traceparent: traceparent || '',
enabled: modifiedTask.enabled ?? true,
});
}
@ -149,13 +150,72 @@ export class TaskScheduling {
...options,
taskInstance: ensureDeprecatedFieldsAreCorrected(taskInstance, this.logger),
});
return { ...modifiedTask, traceparent: traceparent || '' };
return {
...modifiedTask,
traceparent: traceparent || '',
enabled: modifiedTask.enabled ?? true,
};
})
);
return await this.store.bulkSchedule(modifiedTasks);
}
public async bulkEnableDisable(
taskIds: string[],
enabled: boolean
): Promise<BulkUpdateTaskResult> {
const tasks = await pMap(
chunk(taskIds, BULK_ACTION_SIZE),
async (taskIdsChunk) =>
this.store.fetch({
query: {
bool: {
must: [
{
terms: {
_id: taskIdsChunk.map((taskId) => `task:${taskId}`),
},
},
{
term: {
'task.enabled': !enabled,
},
},
],
},
},
size: BULK_ACTION_SIZE,
}),
{ concurrency: 10 }
);
const updatedTasks = tasks
.flatMap(({ docs }) => docs)
.reduce<ConcreteTaskInstance[]>((acc, task) => {
// if task is not enabled, no need to update it
if (enabled === task.enabled) {
return acc;
}
acc.push({ ...task, enabled });
return acc;
}, []);
return (await this.store.bulkUpdate(updatedTasks)).reduce<BulkUpdateTaskResult>(
(acc, task) => {
if (task.tag === 'ok') {
acc.tasks.push(task.value);
} else {
acc.errors.push({ error: task.error.error, task: task.error.entity });
}
return acc;
},
{ tasks: [], errors: [] }
);
}
/**
* Bulk updates schedules for tasks by ids.
* Only tasks with `idle` status will be updated, as for the tasks which have `running` status,
@ -163,14 +223,14 @@ export class TaskScheduling {
*
* @param {string[]} taskIds - list of task ids
* @param {IntervalSchedule} schedule - new schedule
* @returns {Promise<BulkUpdateSchedulesResult>}
* @returns {Promise<BulkUpdateTaskResult>}
*/
public async bulkUpdateSchedules(
taskIds: string[],
schedule: IntervalSchedule
): Promise<BulkUpdateSchedulesResult> {
): Promise<BulkUpdateTaskResult> {
const tasks = await pMap(
chunk(taskIds, 100),
chunk(taskIds, BULK_ACTION_SIZE),
async (taskIdsChunk) =>
this.store.fetch({
query: mustBeAllOf(
@ -185,7 +245,7 @@ export class TaskScheduling {
},
}
),
size: 100,
size: BULK_ACTION_SIZE,
}),
{ concurrency: 10 }
);
@ -211,7 +271,7 @@ export class TaskScheduling {
return acc;
}, []);
return (await this.store.bulkUpdate(updatedTasks)).reduce<BulkUpdateSchedulesResult>(
return (await this.store.bulkUpdate(updatedTasks)).reduce<BulkUpdateTaskResult>(
(acc, task) => {
if (task.tag === 'ok') {
acc.tasks.push(task.value);
@ -226,7 +286,7 @@ export class TaskScheduling {
}
/**
* Run task.
* Run task.
*
* @param taskId - The task being scheduled.
* @returns {Promise<RunSoonResult>}

View file

@ -20,6 +20,48 @@ export class TaskManagerUtils {
this.retry = retry;
}
async waitForDisabled(id: string, taskRunAtFilter: Date) {
return await this.retry.try(async () => {
const searchResult = await this.es.search({
index: '.kibana_task_manager',
body: {
query: {
bool: {
must: [
{
term: {
'task.id': `task:${id}`,
},
},
{
terms: {
'task.scope': ['actions', 'alerting'],
},
},
{
range: {
'task.scheduledAt': {
gte: taskRunAtFilter.getTime().toString(),
},
},
},
{
term: {
'task.enabled': true,
},
},
],
},
},
},
});
// @ts-expect-error
if (searchResult.hits.total.value) {
// @ts-expect-error
throw new Error(`Expected 0 tasks but received ${searchResult.hits.total.value}`);
}
});
}
async waitForEmpty(taskRunAtFilter: Date) {
return await this.retry.try(async () => {
const searchResult = await this.es.search({

View file

@ -137,6 +137,7 @@ export default function createAlertTests({ getService }: FtrProviderContext) {
spaceId: space.id,
consumer: 'alertsFixture',
});
expect(taskRecord.task.enabled).to.eql(true);
// Ensure AAD isn't broken
await checkAAD({
supertest,

View file

@ -16,6 +16,7 @@ import {
ObjectRemover,
getConsumerUnauthorizedErrorMessage,
getProducerUnauthorizedErrorMessage,
TaskManagerDoc,
} from '../../../../common/lib';
// eslint-disable-next-line import/no-default-export
@ -30,11 +31,12 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte
after(() => objectRemover.removeAll());
async function getScheduledTask(id: string) {
return await es.get({
async function getScheduledTask(id: string): Promise<TaskManagerDoc> {
const scheduledTask = await es.get<TaskManagerDoc>({
id: `task:${id}`,
index: '.kibana_task_manager',
});
return scheduledTask._source!;
}
for (const scenario of UserAtSpaceScenarios) {
@ -88,8 +90,16 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte
),
statusCode: 403,
});
// Ensure task still exists
await getScheduledTask(createdAlert.scheduled_task_id);
// Ensure task still exists and is still enabled
const taskRecord1 = await getScheduledTask(createdAlert.scheduled_task_id);
expect(taskRecord1.type).to.eql('task');
expect(taskRecord1.task.taskType).to.eql('alerting:test.noop');
expect(JSON.parse(taskRecord1.task.params)).to.eql({
alertId: createdAlert.id,
spaceId: space.id,
consumer: 'alertsFixture',
});
expect(taskRecord1.task.enabled).to.eql(true);
break;
case 'space_1_all_alerts_none_actions at space1':
case 'superuser at space1':
@ -97,12 +107,17 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte
case 'space_1_all_with_restricted_fixture at space1':
expect(response.statusCode).to.eql(204);
expect(response.body).to.eql('');
try {
await getScheduledTask(createdAlert.scheduled_task_id);
throw new Error('Should have removed scheduled task');
} catch (e) {
expect(e.meta.statusCode).to.eql(404);
}
// task should still exist but be disabled
const taskRecord2 = await getScheduledTask(createdAlert.scheduled_task_id);
expect(taskRecord2.type).to.eql('task');
expect(taskRecord2.task.taskType).to.eql('alerting:test.noop');
expect(JSON.parse(taskRecord2.task.params)).to.eql({
alertId: createdAlert.id,
spaceId: space.id,
consumer: 'alertsFixture',
});
expect(taskRecord2.task.enabled).to.eql(false);
// Ensure AAD isn't broken
await checkAAD({
supertest,
@ -153,12 +168,17 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte
case 'space_1_all_with_restricted_fixture at space1':
expect(response.statusCode).to.eql(204);
expect(response.body).to.eql('');
try {
await getScheduledTask(createdAlert.scheduled_task_id);
throw new Error('Should have removed scheduled task');
} catch (e) {
expect(e.meta.statusCode).to.eql(404);
}
// task should still exist but be disabled
const taskRecord = await getScheduledTask(createdAlert.scheduled_task_id);
expect(taskRecord.type).to.eql('task');
expect(taskRecord.task.taskType).to.eql('alerting:test.restricted-noop');
expect(JSON.parse(taskRecord.task.params)).to.eql({
alertId: createdAlert.id,
spaceId: space.id,
consumer: 'alertsRestrictedFixture',
});
expect(taskRecord.task.enabled).to.eql(false);
break;
default:
throw new Error(`Scenario untested: ${JSON.stringify(scenario)}`);
@ -213,12 +233,16 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte
case 'space_1_all_with_restricted_fixture at space1':
expect(response.statusCode).to.eql(204);
expect(response.body).to.eql('');
try {
await getScheduledTask(createdAlert.scheduled_task_id);
throw new Error('Should have removed scheduled task');
} catch (e) {
expect(e.meta.statusCode).to.eql(404);
}
// task should still exist but be disabled
const taskRecord = await getScheduledTask(createdAlert.scheduled_task_id);
expect(taskRecord.type).to.eql('task');
expect(taskRecord.task.taskType).to.eql('alerting:test.unrestricted-noop');
expect(JSON.parse(taskRecord.task.params)).to.eql({
alertId: createdAlert.id,
spaceId: space.id,
consumer: 'alertsFixture',
});
expect(taskRecord.task.enabled).to.eql(false);
break;
default:
throw new Error(`Scenario untested: ${JSON.stringify(scenario)}`);
@ -269,12 +293,16 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte
case 'space_1_all_with_restricted_fixture at space1':
expect(response.statusCode).to.eql(204);
expect(response.body).to.eql('');
try {
await getScheduledTask(createdAlert.scheduled_task_id);
throw new Error('Should have removed scheduled task');
} catch (e) {
expect(e.meta.statusCode).to.eql(404);
}
// task should still exist but be disabled
const taskRecord = await getScheduledTask(createdAlert.scheduled_task_id);
expect(taskRecord.type).to.eql('task');
expect(taskRecord.task.taskType).to.eql('alerting:test.noop');
expect(JSON.parse(taskRecord.task.params)).to.eql({
alertId: createdAlert.id,
spaceId: space.id,
consumer: 'alerts',
});
expect(taskRecord.task.enabled).to.eql(false);
break;
default:
throw new Error(`Scenario untested: ${JSON.stringify(scenario)}`);
@ -319,8 +347,16 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte
),
statusCode: 403,
});
// Ensure task still exists
await getScheduledTask(createdAlert.scheduled_task_id);
// Ensure task still exists and is still enabled
const taskRecord1 = await getScheduledTask(createdAlert.scheduled_task_id);
expect(taskRecord1.type).to.eql('task');
expect(taskRecord1.task.taskType).to.eql('alerting:test.noop');
expect(JSON.parse(taskRecord1.task.params)).to.eql({
alertId: createdAlert.id,
spaceId: space.id,
consumer: 'alertsFixture',
});
expect(taskRecord1.task.enabled).to.eql(true);
break;
case 'superuser at space1':
case 'space_1_all at space1':
@ -328,12 +364,16 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte
case 'space_1_all_with_restricted_fixture at space1':
expect(response.statusCode).to.eql(204);
expect(response.body).to.eql('');
try {
await getScheduledTask(createdAlert.scheduled_task_id);
throw new Error('Should have removed scheduled task');
} catch (e) {
expect(e.meta.statusCode).to.eql(404);
}
// task should still exist but be disabled
const taskRecord2 = await getScheduledTask(createdAlert.scheduled_task_id);
expect(taskRecord2.type).to.eql('task');
expect(taskRecord2.task.taskType).to.eql('alerting:test.noop');
expect(JSON.parse(taskRecord2.task.params)).to.eql({
alertId: createdAlert.id,
spaceId: space.id,
consumer: 'alertsFixture',
});
expect(taskRecord2.task.enabled).to.eql(false);
break;
default:
throw new Error(`Scenario untested: ${JSON.stringify(scenario)}`);

View file

@ -129,6 +129,7 @@ export default function createEnableAlertTests({ getService }: FtrProviderContex
spaceId: space.id,
consumer: 'alertsFixture',
});
expect(taskRecord.task.enabled).to.eql(true);
// Ensure AAD isn't broken
await checkAAD({
supertest,
@ -360,6 +361,7 @@ export default function createEnableAlertTests({ getService }: FtrProviderContex
spaceId: space.id,
consumer: 'alertsFixture',
});
expect(taskRecord.task.enabled).to.eql(true);
// Ensure AAD isn't broken
await checkAAD({
supertest,

View file

@ -34,7 +34,7 @@ export default function alertTests({ getService }: FtrProviderContext) {
const esTestIndexTool = new ESTestIndexTool(es, retry);
const taskManagerUtils = new TaskManagerUtils(es, retry);
describe('alerts', () => {
describe('alerts test me', () => {
const authorizationIndex = '.kibana-test-authorization';
const objectRemover = new ObjectRemover(supertest);
@ -122,7 +122,7 @@ export default function alertTests({ getService }: FtrProviderContext) {
const alertId = response.body.id;
await alertUtils.disable(alertId);
await taskManagerUtils.waitForEmpty(testStart);
await taskManagerUtils.waitForDisabled(alertId, testStart);
// Ensure only 1 alert executed with proper params
const alertSearchResult = await esTestIndexTool.search(
@ -274,7 +274,7 @@ instanceStateValue: true
const alertId = response.body.id;
await alertUtils.disable(alertId);
await taskManagerUtils.waitForEmpty(testStart);
await taskManagerUtils.waitForDisabled(alertId, testStart);
// Ensure only 1 alert executed with proper params
const alertSearchResult = await esTestIndexTool.search(
@ -634,7 +634,7 @@ instanceStateValue: true
// Wait for test.authorization to index a document before disabling the alert and waiting for tasks to finish
await esTestIndexTool.waitForDocs('alert:test.authorization', reference);
await alertUtils.disable(response.body.id);
await taskManagerUtils.waitForEmpty(testStart);
await taskManagerUtils.waitForDisabled(response.body.id, testStart);
// Ensure only 1 document exists with proper params
searchResult = await esTestIndexTool.search('alert:test.authorization', reference);
@ -665,7 +665,7 @@ instanceStateValue: true
// Wait for test.authorization to index a document before disabling the alert and waiting for tasks to finish
await esTestIndexTool.waitForDocs('alert:test.authorization', reference);
await alertUtils.disable(response.body.id);
await taskManagerUtils.waitForEmpty(testStart);
await taskManagerUtils.waitForDisabled(response.body.id, testStart);
// Ensure only 1 document exists with proper params
searchResult = await esTestIndexTool.search('alert:test.authorization', reference);
@ -751,7 +751,7 @@ instanceStateValue: true
// Ensure test.authorization indexed 1 document before disabling the alert and waiting for tasks to finish
await esTestIndexTool.waitForDocs('action:test.authorization', reference);
await alertUtils.disable(response.body.id);
await taskManagerUtils.waitForEmpty(testStart);
await taskManagerUtils.waitForDisabled(response.body.id, testStart);
// Ensure only 1 document with proper params exists
searchResult = await esTestIndexTool.search('action:test.authorization', reference);
@ -790,7 +790,7 @@ instanceStateValue: true
// Ensure test.authorization indexed 1 document before disabling the alert and waiting for tasks to finish
await esTestIndexTool.waitForDocs('action:test.authorization', reference);
await alertUtils.disable(response.body.id);
await taskManagerUtils.waitForEmpty(testStart);
await taskManagerUtils.waitForDisabled(response.body.id, testStart);
// Ensure only 1 document with proper params exists
searchResult = await esTestIndexTool.search('action:test.authorization', reference);
@ -853,7 +853,7 @@ instanceStateValue: true
// Wait until alerts scheduled actions 3 times before disabling the alert and waiting for tasks to finish
await esTestIndexTool.waitForDocs('alert:test.always-firing', reference, 3);
await alertUtils.disable(response.body.id);
await taskManagerUtils.waitForEmpty(testStart);
await taskManagerUtils.waitForDisabled(response.body.id, testStart);
// Ensure actions only executed once
const searchResult = await esTestIndexTool.search(
@ -933,7 +933,7 @@ instanceStateValue: true
// Wait for actions to execute twice before disabling the alert and waiting for tasks to finish
await esTestIndexTool.waitForDocs('action:test.index-record', reference, 2);
await alertUtils.disable(response.body.id);
await taskManagerUtils.waitForEmpty(testStart);
await taskManagerUtils.waitForDisabled(response.body.id, testStart);
// Ensure only 2 actions with proper params exists
const searchResult = await esTestIndexTool.search(
@ -1009,7 +1009,7 @@ instanceStateValue: true
// Wait for actions to execute twice before disabling the alert and waiting for tasks to finish
await esTestIndexTool.waitForDocs('action:test.index-record', reference, 2);
await alertUtils.disable(response.body.id);
await taskManagerUtils.waitForEmpty(testStart);
await taskManagerUtils.waitForDisabled(response.body.id, testStart);
// Ensure only 2 actions with proper params exists
const searchResult = await esTestIndexTool.search(
@ -1074,7 +1074,7 @@ instanceStateValue: true
// Actions should execute twice before widning things down
await esTestIndexTool.waitForDocs('action:test.index-record', reference, 2);
await alertUtils.disable(response.body.id);
await taskManagerUtils.waitForEmpty(testStart);
await taskManagerUtils.waitForDisabled(response.body.id, testStart);
// Ensure only 2 actions are executed
const searchResult = await esTestIndexTool.search(
@ -1133,7 +1133,7 @@ instanceStateValue: true
// execution once before disabling the alert and waiting for tasks to finish
await esTestIndexTool.waitForDocs('alert:test.always-firing', reference, 2);
await alertUtils.disable(response.body.id);
await taskManagerUtils.waitForEmpty(testStart);
await taskManagerUtils.waitForDisabled(response.body.id, testStart);
// Should not have executed any action
const executedActionsResult = await esTestIndexTool.search(
@ -1192,7 +1192,7 @@ instanceStateValue: true
// once before disabling the alert and waiting for tasks to finish
await esTestIndexTool.waitForDocs('alert:test.always-firing', reference, 2);
await alertUtils.disable(response.body.id);
await taskManagerUtils.waitForEmpty(testStart);
await taskManagerUtils.waitForDisabled(response.body.id, testStart);
// Should not have executed any action
const executedActionsResult = await esTestIndexTool.search(
@ -1252,7 +1252,7 @@ instanceStateValue: true
// Ensure actions are executed once before disabling the alert and waiting for tasks to finish
await esTestIndexTool.waitForDocs('action:test.index-record', reference, 1);
await alertUtils.disable(response.body.id);
await taskManagerUtils.waitForEmpty(testStart);
await taskManagerUtils.waitForDisabled(response.body.id, testStart);
// Should have one document indexed by the action
const searchResult = await esTestIndexTool.search(

View file

@ -108,6 +108,7 @@ export default function createAlertTests({ getService }: FtrProviderContext) {
spaceId: Spaces.space1.id,
consumer: 'alertsFixture',
});
expect(taskRecord.task.enabled).to.eql(true);
// Ensure AAD isn't broken
await checkAAD({
supertest,
@ -498,6 +499,7 @@ export default function createAlertTests({ getService }: FtrProviderContext) {
spaceId: Spaces.space1.id,
consumer: 'alertsFixture',
});
expect(taskRecord.task.enabled).to.eql(true);
// Ensure AAD isn't broken
await checkAAD({
supertest,

View file

@ -15,6 +15,7 @@ import {
getTestRuleData,
ObjectRemover,
getEventLog,
TaskManagerDoc,
} from '../../../common/lib';
import { validateEvent } from './event_log';
@ -31,11 +32,12 @@ export default function createDisableRuleTests({ getService }: FtrProviderContex
after(() => objectRemover.removeAll());
async function getScheduledTask(id: string) {
return await es.get({
async function getScheduledTask(id: string): Promise<TaskManagerDoc> {
const scheduledTask = await es.get<TaskManagerDoc>({
id: `task:${id}`,
index: '.kibana_task_manager',
});
return scheduledTask._source!;
}
it('should handle disable rule request appropriately', async () => {
@ -48,12 +50,16 @@ export default function createDisableRuleTests({ getService }: FtrProviderContex
await ruleUtils.disable(createdRule.id);
try {
await getScheduledTask(createdRule.scheduled_task_id);
throw new Error('Should have removed scheduled task');
} catch (e) {
expect(e.meta.statusCode).to.eql(404);
}
// task doc should still exist but be disabled
const taskRecord = await getScheduledTask(createdRule.scheduled_task_id);
expect(taskRecord.type).to.eql('task');
expect(taskRecord.task.taskType).to.eql('alerting:test.noop');
expect(JSON.parse(taskRecord.task.params)).to.eql({
alertId: createdRule.id,
spaceId: Spaces.space1.id,
consumer: 'alertsFixture',
});
expect(taskRecord.task.enabled).to.eql(false);
// Ensure AAD isn't broken
await checkAAD({
@ -188,12 +194,16 @@ export default function createDisableRuleTests({ getService }: FtrProviderContex
.set('kbn-xsrf', 'foo')
.expect(204);
try {
await getScheduledTask(createdRule.scheduled_task_id);
throw new Error('Should have removed scheduled task');
} catch (e) {
expect(e.meta.statusCode).to.eql(404);
}
// task doc should still exist but be disabled
const taskRecord = await getScheduledTask(createdRule.scheduled_task_id);
expect(taskRecord.type).to.eql('task');
expect(taskRecord.task.taskType).to.eql('alerting:test.noop');
expect(JSON.parse(taskRecord.task.params)).to.eql({
alertId: createdRule.id,
spaceId: Spaces.space1.id,
consumer: 'alertsFixture',
});
expect(taskRecord.task.enabled).to.eql(false);
// Ensure AAD isn't broken
await checkAAD({

View file

@ -59,6 +59,7 @@ export default function createEnableAlertTests({ getService }: FtrProviderContex
spaceId: Spaces.space1.id,
consumer: 'alertsFixture',
});
expect(taskRecord.task.enabled).to.eql(true);
// Ensure AAD isn't broken
await checkAAD({
@ -111,6 +112,7 @@ export default function createEnableAlertTests({ getService }: FtrProviderContex
spaceId: Spaces.space1.id,
consumer: 'alertsFixture',
});
expect(taskRecord.task.enabled).to.eql(true);
// Ensure AAD isn't broken
await checkAAD({

View file

@ -109,6 +109,7 @@ export default function createScheduledTaskIdTests({ getService }: FtrProviderCo
spaceId: 'default',
consumer: 'alertsFixture',
});
expect(taskRecord.task.enabled).to.eql(true);
});
});
}

View file

@ -136,5 +136,59 @@ export default function createGetTests({ getService }: FtrProviderContext) {
expect(response.body._source?.task.taskType).to.eql(`sampleTaskRemovedType`);
expect(response.body._source?.task.status).to.eql(`unrecognized`);
});
it('8.5.0 migrates active tasks to set enabled to true', async () => {
const response = await es.search<{ task: ConcreteTaskInstance }>(
{
index: '.kibana_task_manager',
size: 100,
body: {
query: {
match_all: {},
},
},
},
{
meta: true,
}
);
expect(response.statusCode).to.eql(200);
const tasks = response.body.hits.hits;
tasks
.filter(
(task) =>
task._source?.task.status !== 'failed' && task._source?.task.status !== 'unrecognized'
)
.forEach((task) => {
expect(task._source?.task.enabled).to.eql(true);
});
});
it('8.5.0 does not migrates failed and unrecognized', async () => {
const response = await es.search<{ task: ConcreteTaskInstance }>(
{
index: '.kibana_task_manager',
size: 100,
body: {
query: {
match_all: {},
},
},
},
{
meta: true,
}
);
expect(response.statusCode).to.eql(200);
const tasks = response.body.hits.hits;
tasks
.filter(
(task) =>
task._source?.task.status === 'failed' || task._source?.task.status === 'unrecognized'
)
.forEach((task) => {
expect(task._source?.task.enabled).to.be(undefined);
});
});
});
}

View file

@ -11,7 +11,7 @@ import expect from '@kbn/expect';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import TaskManagerMapping from '@kbn/task-manager-plugin/server/saved_objects/mappings.json';
import { DEFAULT_POLL_INTERVAL } from '@kbn/task-manager-plugin/server/config';
import { ConcreteTaskInstance, BulkUpdateSchedulesResult } from '@kbn/task-manager-plugin/server';
import { ConcreteTaskInstance, BulkUpdateTaskResult } from '@kbn/task-manager-plugin/server';
import { FtrProviderContext } from '../../ftr_provider_context';
const {
@ -184,7 +184,7 @@ export default function ({ getService }: FtrProviderContext) {
.set('kbn-xsrf', 'xxx')
.send({ taskIds, schedule })
.expect(200)
.then((response: { body: BulkUpdateSchedulesResult }) => response.body);
.then((response: { body: BulkUpdateTaskResult }) => response.body);
}
// TODO: Add this back in with https://github.com/elastic/kibana/issues/106139