[Response Ops][Task Manager] Run attack discovery at a lower priority (#218907)

## Summary

Issue: https://github.com/elastic/kibana/issues/216631

This PR adds a new priority called `normalLongRunning` that is slightly
lower than the normal task priority. This priority is applied to the
`attack-discovery` rule type. Unit and E2E tests are also added to
verify that the new priority is working as intended.


### Checklist
- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
This commit is contained in:
Jiawei Wu 2025-06-24 07:45:32 -07:00 committed by GitHub
parent 796f233c61
commit 62fc123ba9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 308 additions and 11 deletions

View file

@ -19,6 +19,7 @@ import { alertsServiceMock } from './alerts_service/alerts_service.mock';
import { schema } from '@kbn/config-schema';
import type { RecoveredActionGroupId } from '../common';
import type { AlertingConfig } from './config';
import { TaskPriority } from '@kbn/task-manager-plugin/server';
import { DEFAULT_APP_CATEGORIES } from '@kbn/core/server';
const logger = loggingSystemMock.create().get();
@ -500,6 +501,79 @@ describe('Create Lifecycle', () => {
]);
});
test('allows RuleType to specify a priority', () => {
const ruleType: RuleType<never, never, never, never, never, 'default', 'backToAwesome', {}> =
{
id: 'test',
name: 'Test',
actionGroups: [
{
id: 'default',
name: 'Default',
},
],
defaultActionGroupId: 'default',
recoveryActionGroup: {
id: 'backToAwesome',
name: 'Back To Awesome',
},
priority: TaskPriority.NormalLongRunning,
executor: jest.fn(),
category: 'test',
producer: 'alerts',
solution: 'stack',
minimumLicenseRequired: 'basic',
isExportable: true,
validate: {
params: { validate: (params) => params },
},
};
const registry = new RuleTypeRegistry(ruleTypeRegistryParams);
registry.register(ruleType);
expect(registry.get('test').priority).toEqual(TaskPriority.NormalLongRunning);
expect(taskManager.registerTaskDefinitions).toHaveBeenCalledTimes(1);
expect(taskManager.registerTaskDefinitions.mock.calls[0][0]).toMatchObject({
'alerting:test': {
title: 'Test',
priority: TaskPriority.NormalLongRunning,
},
});
});
test('throws if RuleType priority provided is invalid', () => {
const ruleType: RuleType<never, never, never, never, never, 'default', 'backToAwesome', {}> =
{
id: 'test',
name: 'Test',
actionGroups: [
{
id: 'default',
name: 'Default',
},
],
defaultActionGroupId: 'default',
recoveryActionGroup: {
id: 'backToAwesome',
name: 'Back To Awesome',
},
priority: TaskPriority.Low as TaskPriority.Normal, // Have to cast to force this error case
executor: jest.fn(),
category: 'test',
producer: 'alerts',
solution: 'stack',
minimumLicenseRequired: 'basic',
isExportable: true,
validate: {
params: { validate: (params) => params },
},
};
const registry = new RuleTypeRegistry(ruleTypeRegistryParams);
expect(() => registry.register(ruleType)).toThrowError(
new Error(`Rule type \"test\" has invalid priority: 1.`)
);
});
test('throws if the custom recovery group is contained in the RuleType action groups', () => {
const ruleType: RuleType<
never,

View file

@ -14,7 +14,7 @@ import type { Logger } from '@kbn/core/server';
import type { LicensingPluginSetup } from '@kbn/licensing-plugin/server';
import type { RunContext, TaskManagerSetupContract } from '@kbn/task-manager-plugin/server';
import { stateSchemaByVersion } from '@kbn/alerting-state-types';
import { TaskCost } from '@kbn/task-manager-plugin/server/task';
import { TaskCost, TaskPriority } from '@kbn/task-manager-plugin/server/task';
import type { TaskRunnerFactory } from './task_runner';
import type {
RuleType,
@ -71,6 +71,7 @@ export interface RegistryRuleType
| 'defaultScheduleInterval'
| 'doesSetRecoveryContext'
| 'alerts'
| 'priority'
| 'internallyManaged'
> {
id: string;
@ -275,6 +276,20 @@ export class RuleTypeRegistry {
}
}
if (ruleType.priority) {
if (![TaskPriority.Normal, TaskPriority.NormalLongRunning].includes(ruleType.priority)) {
throw new Error(
i18n.translate('xpack.alerting.ruleTypeRegistry.register.invalidPriorityRuleTypeError', {
defaultMessage: 'Rule type "{id}" has invalid priority: {errorMessage}.',
values: {
id: ruleType.id,
errorMessage: ruleType.priority,
},
})
);
}
}
const normalizedRuleType = augmentActionGroupsWithReserved<
Params,
ExtractedParams,
@ -297,6 +312,7 @@ export class RuleTypeRegistry {
this.taskManager.registerTaskDefinitions({
[`alerting:${ruleType.id}`]: {
title: ruleType.name,
priority: ruleType.priority,
timeout: ruleType.ruleTaskTimeout,
stateSchemaByVersion,
createTaskRunner: (context: RunContext) =>
@ -406,6 +422,7 @@ export class RuleTypeRegistry {
).isValid,
hasAlertsMappings: !!_ruleType.alerts,
...(_ruleType.alerts ? { alerts: _ruleType.alerts } : {}),
...(_ruleType.priority ? { priority: _ruleType.priority } : {}),
validLegacyConsumers: _ruleType.validLegacyConsumers,
};

View file

@ -30,6 +30,7 @@ import type { DefaultAlert, FieldMap } from '@kbn/alerts-as-data-utils';
import type { Alert } from '@kbn/alerts-as-data-utils';
import type { ActionsApiRequestHandlerContext, ActionsClient } from '@kbn/actions-plugin/server';
import type { AlertsHealth, RuleTypeSolution } from '@kbn/alerting-types';
import type { TaskPriority } from '@kbn/task-manager-plugin/server';
import type { RuleTypeRegistry as OrigruleTypeRegistry } from './rule_type_registry';
import type { AlertingServerSetup, AlertingServerStart } from './plugin';
import type { RulesClient } from './rules_client';
@ -356,6 +357,11 @@ export interface RuleType<
*/
autoRecoverAlerts?: boolean;
getViewInAppRelativeUrl?: GetViewInAppRelativeUrlFn<Params>;
/**
* Task priority allowing for tasks to be ran at lower priority (NormalLongRunning vs Normal), defaults to
* normal priority.
*/
priority?: TaskPriority.Normal | TaskPriority.NormalLongRunning;
/**
* Indicates that the rule type is managed internally by a Kibana plugin.
* Alerts of internally managed rule types are not returned by the APIs and thus not shown in the alerts table.

View file

@ -21,6 +21,7 @@ export const DEFAULT_TIMEOUT = '5m';
export enum TaskPriority {
Low = 1,
NormalLongRunning = 40,
Normal = 50,
}

View file

@ -201,7 +201,7 @@ describe('taskTypeDictionary', () => {
};
expect(runsanitize).toThrowErrorMatchingInlineSnapshot(
`"Invalid priority \\"23\\". Priority must be one of Low => 1,Normal => 50"`
`"Invalid priority \\"23\\". Priority must be one of Low => 1,NormalLongRunning => 40,Normal => 50"`
);
});
});
@ -249,7 +249,7 @@ describe('taskTypeDictionary', () => {
},
});
expect(logger.error).toHaveBeenCalledWith(
`Could not sanitize task definitions: Invalid priority \"23\". Priority must be one of Low => 1,Normal => 50`
`Could not sanitize task definitions: Invalid priority \"23\". Priority must be one of Low => 1,NormalLongRunning => 40,Normal => 50`
);
expect(definitions.get('foo')).toEqual(undefined);
});

View file

@ -353,6 +353,36 @@ export class SampleTaskManagerFixturePlugin
refresh: true,
});
return {
state: { count },
schedule,
};
},
}),
},
normalLongRunningPriorityTask: {
title: 'Task used for testing long running priority claiming',
priority: TaskPriority.Low,
createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => ({
async run() {
const { state, schedule } = taskInstance;
const prevState = state || { count: 0 };
const count = (prevState.count || 0) + 1;
const [{ elasticsearch }] = await core.getStartServices();
await elasticsearch.client.asInternalUser.index({
index: '.kibana_task_manager_test_result',
body: {
type: 'task',
taskType: 'normalLongRunningPriorityTask',
taskId: taskInstance.id,
state: JSON.stringify(state),
ranAt: new Date(),
},
refresh: true,
});
return {
state: { count },
schedule,

View file

@ -22,6 +22,7 @@ export default function ({ getService }: FtrProviderContext) {
const TEST_TYPES = [
'sampleAdHocTaskTimingOut',
'lowPriorityTask',
'normalLongRunningPriorityTask',
'sampleOneTimeTaskThrowingError',
'sampleRecurringTaskTimingOut',
'sampleRecurringTaskWhichHangs',

View file

@ -125,6 +125,10 @@ export default function ({ getService }: FtrProviderContext) {
afterEach(async () => {
await supertest.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200);
// Timeout added here to ensure all tasks are claimed and therefore deleted
// for the next test case
await new Promise((r) => setTimeout(r, 10000));
await supertest.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200);
});
it('should claim low priority tasks if there is capacity', async () => {
@ -140,6 +144,13 @@ export default function ({ getService }: FtrProviderContext) {
})
);
}
tasksToSchedule.push(
scheduleTask({
taskType: 'normalLongRunningPriorityTask',
schedule: { interval: `1d` },
params: {},
})
);
tasksToSchedule.push(
scheduleTask({
taskType: 'lowPriorityTask',
@ -151,7 +162,7 @@ export default function ({ getService }: FtrProviderContext) {
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(tasks.length).to.eql(6);
expect(tasks.length).to.eql(7);
const taskIds = tasks.map((task) => task.id);
const taskDocs: RawDoc[] = [];
@ -165,6 +176,11 @@ export default function ({ getService }: FtrProviderContext) {
expect(
taskDocs.findIndex((taskDoc) => taskDoc._source.taskType === 'lowPriorityTask')
).to.be.greaterThan(-1);
expect(
taskDocs.findIndex(
(taskDoc) => taskDoc._source.taskType === 'normalLongRunningPriorityTask'
)
).to.be.greaterThan(-1);
});
});
@ -189,6 +205,61 @@ export default function ({ getService }: FtrProviderContext) {
params: {},
})
);
// schedule a normal long running priority task
tasksToSchedule.push(
scheduleTask({
taskType: 'normalLongRunningPriorityTask',
schedule: { interval: `1s` },
params: {},
})
);
const scheduledTasks = await Promise.all(tasksToSchedule);
// make sure all tasks get created
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(tasks.length).to.eql(12);
const taskIds = tasks.map((task) => task.id);
scheduledTasks.forEach((scheduledTask) => {
expect(taskIds).to.contain(scheduledTask.id);
});
});
// wait for 30 seconds to let the multiple task claiming cycles run
await new Promise((r) => setTimeout(r, 30000));
const lowPriorityDocs: RawDoc[] = await historyDocs({ taskType: 'lowPriorityTask' });
expect(lowPriorityDocs.length).to.eql(0);
const normalLongRunningDocs: RawDoc[] = await historyDocs({
taskType: 'normalLongRunningPriorityTask',
});
expect(normalLongRunningDocs.length).to.eql(0);
});
it('should not claim low priority tasks when there is no capacity due to normal long running tasks', async () => {
// schedule a bunch of normal priority tasks that run frequently
const tasksToSchedule = [];
for (let i = 0; i < 10; i++) {
tasksToSchedule.push(
scheduleTask({
taskType: 'normalLongRunningPriorityTask',
schedule: { interval: `1s` },
params: {},
})
);
}
// schedule a low priority task
tasksToSchedule.push(
scheduleTask({
taskType: 'lowPriorityTask',
schedule: { interval: `1s` },
params: {},
})
);
const scheduledTasks = await Promise.all(tasksToSchedule);
// make sure all tasks get created
@ -205,8 +276,8 @@ export default function ({ getService }: FtrProviderContext) {
// wait for 30 seconds to let the multiple task claiming cycles run
await new Promise((r) => setTimeout(r, 30000));
const docs: RawDoc[] = await historyDocs({ taskType: 'lowPriorityTask' });
expect(docs.length).to.eql(0);
const lowPriorityDocs: RawDoc[] = await historyDocs({ taskType: 'lowPriorityTask' });
expect(lowPriorityDocs.length).to.eql(0);
});
});
}

View file

@ -14,6 +14,7 @@ import {
import { getAttackDiscoveryScheduleType } from '.';
import { ATTACK_DISCOVERY_ALERTS_AAD_CONFIG } from '../constants';
import { TaskPriority } from '@kbn/task-manager-plugin/server';
describe('getAttackDiscoveryScheduleType', () => {
const mockLogger = loggerMock.create();
@ -39,6 +40,7 @@ describe('getAttackDiscoveryScheduleType', () => {
category: 'securitySolution',
producer: 'siem',
solution: 'security',
priority: TaskPriority.NormalLongRunning,
schemas: {
params: { type: 'zod', schema: AttackDiscoveryScheduleParams },
},

View file

@ -11,6 +11,7 @@ import {
AttackDiscoveryScheduleParams,
} from '@kbn/elastic-assistant-common';
import { TaskPriority } from '@kbn/task-manager-plugin/server';
import { ATTACK_DISCOVERY_ALERTS_AAD_CONFIG } from '../constants';
import { AttackDiscoveryExecutorOptions, AttackDiscoveryScheduleType } from '../types';
import { attackDiscoveryScheduleExecutor } from './executor';
@ -35,6 +36,7 @@ export const getAttackDiscoveryScheduleType = ({
category: DEFAULT_APP_CATEGORIES.security.id,
producer: 'siem',
solution: 'security',
priority: TaskPriority.NormalLongRunning,
validate: {
params: {
validate: (object: unknown) => {

View file

@ -253,6 +253,36 @@ export class SampleTaskManagerFixturePlugin
refresh: true,
});
return {
state: { count },
schedule,
};
},
}),
},
normalLongRunningPriorityTask: {
title: 'Task used for testing long running priority claiming',
priority: TaskPriority.NormalLongRunning,
createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => ({
async run() {
const { state, schedule } = taskInstance;
const prevState = state || { count: 0 };
const count = (prevState.count || 0) + 1;
const [{ elasticsearch }] = await core.getStartServices();
await elasticsearch.client.asInternalUser.index({
index: '.kibana_task_manager_test_result',
body: {
type: 'task',
taskType: 'normalLongRunningPriorityTask',
taskId: taskInstance.id,
state: JSON.stringify(state),
ranAt: new Date(),
},
refresh: true,
});
return {
state: { count },
schedule,

View file

@ -140,6 +140,13 @@ export default function ({ getService }: FtrProviderContext) {
})
);
}
tasksToSchedule.push(
scheduleTask({
taskType: 'normalLongRunningPriorityTask',
schedule: { interval: `1d` },
params: {},
})
);
tasksToSchedule.push(
scheduleTask({
taskType: 'lowPriorityTask',
@ -151,7 +158,7 @@ export default function ({ getService }: FtrProviderContext) {
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(tasks.length).to.eql(6);
expect(tasks.length).to.eql(7);
const taskIds = tasks.map((task) => task.id);
const taskDocs: RawDoc[] = [];
@ -165,6 +172,11 @@ export default function ({ getService }: FtrProviderContext) {
expect(
taskDocs.findIndex((taskDoc) => taskDoc._source.taskType === 'lowPriorityTask')
).to.be.greaterThan(-1);
expect(
taskDocs.findIndex(
(taskDoc) => taskDoc._source.taskType === 'normalLongRunningPriorityTask'
)
).to.be.greaterThan(-1);
});
});
@ -189,13 +201,20 @@ export default function ({ getService }: FtrProviderContext) {
params: {},
})
);
// schedule a normal long running priority task
tasksToSchedule.push(
scheduleTask({
taskType: 'normalLongRunningPriorityTask',
schedule: { interval: `1s` },
params: {},
})
);
const scheduledTasks = await Promise.all(tasksToSchedule);
// make sure all tasks get created
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(tasks.length).to.eql(11);
const taskIds = tasks.map((task) => task.id);
scheduledTasks.forEach((scheduledTask) => {
expect(taskIds).to.contain(scheduledTask.id);
@ -205,8 +224,52 @@ export default function ({ getService }: FtrProviderContext) {
// wait for 30 seconds to let the multiple task claiming cycles run
await new Promise((r) => setTimeout(r, 30000));
const docs: RawDoc[] = await historyDocs({ taskType: 'lowPriorityTask' });
expect(docs.length).to.eql(0);
const lowPriorityDocs: RawDoc[] = await historyDocs({ taskType: 'lowPriorityTask' });
expect(lowPriorityDocs.length).to.eql(0);
const normalLongRunningDocs: RawDoc[] = await historyDocs({
taskType: 'normalLongRunningPriorityTask',
});
expect(normalLongRunningDocs.length).to.eql(0);
});
it('should not claim low priority tasks when there is no capacity due to normal long running tasks', async () => {
// schedule a bunch of normal priority tasks that run frequently
const tasksToSchedule = [];
for (let i = 0; i < 10; i++) {
tasksToSchedule.push(
scheduleTask({
taskType: 'normalLongRunningPriorityTask',
schedule: { interval: `1s` },
params: {},
})
);
}
// schedule a low priority task
tasksToSchedule.push(
scheduleTask({
taskType: 'lowPriorityTask',
schedule: { interval: `1s` },
params: {},
})
);
const scheduledTasks = await Promise.all(tasksToSchedule);
// make sure all tasks get created
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
const taskIds = tasks.map((task) => task.id);
scheduledTasks.forEach((scheduledTask) => {
expect(taskIds).to.contain(scheduledTask.id);
});
});
// wait for 30 seconds to let the multiple task claiming cycles run
await new Promise((r) => setTimeout(r, 30000));
const lowPriorityDocs: RawDoc[] = await historyDocs({ taskType: 'lowPriorityTask' });
expect(lowPriorityDocs.length).to.eql(0);
});
});
}