[ResponseOps][Alerting] Optimize the scheduling of rule actions so it can happen in bulk (#137781)

* Adding bulk scheduler

* Updating bulk schedule

* Fixing failing tests

* Fixing test

* Adding bulk schedule tests

* Cleaning up enqueue function

* Using bulk getConnectors

* Removing empty line

* Update x-pack/plugins/actions/server/create_execute_function.ts

Co-authored-by: Mike Côté <mikecote@users.noreply.github.com>

* Update x-pack/plugins/actions/server/create_execute_function.ts

Co-authored-by: Mike Côté <mikecote@users.noreply.github.com>

* Cleaning up auth

* Fixing test failure

* Updating bulk auth changes

* Fixed track change

* Fixing test failures

* Addressing pr comments

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Mike Côté <mikecote@users.noreply.github.com>
This commit is contained in:
doakalexi 2022-08-12 16:13:06 -04:00 committed by GitHub
parent c868daad50
commit 4ddc26d262
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 1536 additions and 165 deletions

View file

@ -23,6 +23,7 @@ const createActionsClientMock = () => {
execute: jest.fn(),
enqueueExecution: jest.fn(),
ephemeralEnqueuedExecution: jest.fn(),
bulkEnqueueExecution: jest.fn(),
listTypes: jest.fn(),
isActionTypeEnabled: jest.fn(),
isPreconfigured: jest.fn(),

View file

@ -32,6 +32,7 @@ import { ActionsAuthorization } from './authorization/actions_authorization';
import {
getAuthorizationModeBySource,
AuthorizationMode,
getBulkAuthorizationModeBySource,
} from './authorization/get_authorization_mode_by_source';
import { actionsAuthorizationMock } from './authorization/actions_authorization.mock';
import { trackLegacyRBACExemption } from './lib/track_legacy_rbac_exemption';
@ -59,6 +60,9 @@ jest.mock('./authorization/get_authorization_mode_by_source', () => {
getAuthorizationModeBySource: jest.fn(() => {
return 1;
}),
getBulkAuthorizationModeBySource: jest.fn(() => {
return 1;
}),
AuthorizationMode: {
Legacy: 0,
RBAC: 1,
@ -80,6 +84,7 @@ const actionExecutor = actionExecutorMock.create();
const authorization = actionsAuthorizationMock.create();
const executionEnqueuer = jest.fn();
const ephemeralExecutionEnqueuer = jest.fn();
const bulkExecutionEnqueuer = jest.fn();
const request = httpServerMock.createKibanaRequest();
const auditLogger = auditLoggerMock.create();
const mockUsageCountersSetup = usageCountersServiceMock.createSetupContract();
@ -124,6 +129,7 @@ beforeEach(() => {
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
bulkExecutionEnqueuer,
request,
authorization: authorization as unknown as ActionsAuthorization,
auditLogger,
@ -550,6 +556,7 @@ describe('create()', () => {
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
bulkExecutionEnqueuer,
request,
authorization: authorization as unknown as ActionsAuthorization,
connectorTokenClient: connectorTokenClientMock.create(),
@ -655,6 +662,7 @@ describe('get()', () => {
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
bulkExecutionEnqueuer,
request,
authorization: authorization as unknown as ActionsAuthorization,
preconfiguredActions: [
@ -714,6 +722,7 @@ describe('get()', () => {
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
bulkExecutionEnqueuer,
request,
authorization: authorization as unknown as ActionsAuthorization,
preconfiguredActions: [
@ -835,6 +844,7 @@ describe('get()', () => {
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
bulkExecutionEnqueuer,
request,
authorization: authorization as unknown as ActionsAuthorization,
preconfiguredActions: [
@ -909,6 +919,7 @@ describe('getAll()', () => {
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
bulkExecutionEnqueuer,
request,
authorization: authorization as unknown as ActionsAuthorization,
preconfiguredActions: [
@ -1050,6 +1061,7 @@ describe('getAll()', () => {
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
bulkExecutionEnqueuer,
request,
authorization: authorization as unknown as ActionsAuthorization,
preconfiguredActions: [
@ -1131,6 +1143,7 @@ describe('getBulk()', () => {
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
bulkExecutionEnqueuer,
request,
authorization: authorization as unknown as ActionsAuthorization,
preconfiguredActions: [
@ -1266,6 +1279,7 @@ describe('getBulk()', () => {
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
bulkExecutionEnqueuer,
request,
authorization: authorization as unknown as ActionsAuthorization,
preconfiguredActions: [
@ -1324,6 +1338,7 @@ describe('getOAuthAccessToken()', () => {
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
bulkExecutionEnqueuer,
request,
authorization: authorization as unknown as ActionsAuthorization,
preconfiguredActions: [
@ -2326,6 +2341,119 @@ describe('enqueueExecution()', () => {
});
});
describe('bulkEnqueueExecution()', () => {
describe('authorization', () => {
test('ensures user is authorised to excecute actions', async () => {
(getBulkAuthorizationModeBySource as jest.Mock).mockImplementationOnce(() => {
return { [AuthorizationMode.RBAC]: 1, [AuthorizationMode.Legacy]: 0 };
});
await actionsClient.bulkEnqueueExecution([
{
id: uuid.v4(),
params: {},
spaceId: 'default',
executionId: '123abc',
apiKey: null,
},
{
id: uuid.v4(),
params: {},
spaceId: 'default',
executionId: '456def',
apiKey: null,
},
]);
expect(authorization.ensureAuthorized).toHaveBeenCalledWith('execute');
});
test('throws when user is not authorised to create the type of action', async () => {
(getBulkAuthorizationModeBySource as jest.Mock).mockImplementationOnce(() => {
return { [AuthorizationMode.RBAC]: 1, [AuthorizationMode.Legacy]: 0 };
});
authorization.ensureAuthorized.mockRejectedValue(
new Error(`Unauthorized to execute all actions`)
);
await expect(
actionsClient.bulkEnqueueExecution([
{
id: uuid.v4(),
params: {},
spaceId: 'default',
executionId: '123abc',
apiKey: null,
},
{
id: uuid.v4(),
params: {},
spaceId: 'default',
executionId: '456def',
apiKey: null,
},
])
).rejects.toMatchInlineSnapshot(`[Error: Unauthorized to execute all actions]`);
expect(authorization.ensureAuthorized).toHaveBeenCalledWith('execute');
});
test('tracks legacy RBAC', async () => {
(getBulkAuthorizationModeBySource as jest.Mock).mockImplementationOnce(() => {
return { [AuthorizationMode.RBAC]: 0, [AuthorizationMode.Legacy]: 2 };
});
await actionsClient.bulkEnqueueExecution([
{
id: uuid.v4(),
params: {},
spaceId: 'default',
executionId: '123abc',
apiKey: null,
},
{
id: uuid.v4(),
params: {},
spaceId: 'default',
executionId: '456def',
apiKey: null,
},
]);
expect(trackLegacyRBACExemption as jest.Mock).toBeCalledWith(
'bulkEnqueueExecution',
mockUsageCounter,
2
);
});
});
test('calls the bulkExecutionEnqueuer with the appropriate parameters', async () => {
(getBulkAuthorizationModeBySource as jest.Mock).mockImplementationOnce(() => {
return { [AuthorizationMode.RBAC]: 0, [AuthorizationMode.Legacy]: 0 };
});
const opts = [
{
id: uuid.v4(),
params: {},
spaceId: 'default',
executionId: '123abc',
apiKey: null,
},
{
id: uuid.v4(),
params: {},
spaceId: 'default',
executionId: '456def',
apiKey: null,
},
];
await expect(actionsClient.bulkEnqueueExecution(opts)).resolves.toMatchInlineSnapshot(
`undefined`
);
expect(bulkExecutionEnqueuer).toHaveBeenCalledWith(unsecuredSavedObjectsClient, opts);
});
});
describe('isActionTypeEnabled()', () => {
const fooActionType: ActionType = {
id: 'foo',
@ -2366,6 +2494,7 @@ describe('isPreconfigured()', () => {
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
bulkExecutionEnqueuer,
request,
authorization: authorization as unknown as ActionsAuthorization,
preconfiguredActions: [
@ -2403,6 +2532,7 @@ describe('isPreconfigured()', () => {
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
bulkExecutionEnqueuer,
request,
authorization: authorization as unknown as ActionsAuthorization,
preconfiguredActions: [

View file

@ -25,7 +25,13 @@ import { AuditLogger } from '@kbn/security-plugin/server';
import { RunNowResult } from '@kbn/task-manager-plugin/server';
import { ActionType } from '../common';
import { ActionTypeRegistry } from './action_type_registry';
import { validateConfig, validateSecrets, ActionExecutorContract, validateConnector } from './lib';
import {
validateConfig,
validateSecrets,
ActionExecutorContract,
validateConnector,
ActionExecutionSource,
} from './lib';
import {
ActionResult,
FindActionResult,
@ -39,10 +45,12 @@ import { ExecuteOptions } from './lib/action_executor';
import {
ExecutionEnqueuer,
ExecuteOptions as EnqueueExecutionOptions,
BulkExecutionEnqueuer,
} from './create_execute_function';
import { ActionsAuthorization } from './authorization/actions_authorization';
import {
getAuthorizationModeBySource,
getBulkAuthorizationModeBySource,
AuthorizationMode,
} from './authorization/get_authorization_mode_by_source';
import { connectorAuditEvent, ConnectorAuditAction } from './lib/audit_events';
@ -94,6 +102,7 @@ interface ConstructorOptions {
actionExecutor: ActionExecutorContract;
executionEnqueuer: ExecutionEnqueuer<void>;
ephemeralExecutionEnqueuer: ExecutionEnqueuer<RunNowResult>;
bulkExecutionEnqueuer: BulkExecutionEnqueuer<void>;
request: KibanaRequest;
authorization: ActionsAuthorization;
auditLogger?: AuditLogger;
@ -118,6 +127,7 @@ export class ActionsClient {
private readonly authorization: ActionsAuthorization;
private readonly executionEnqueuer: ExecutionEnqueuer<void>;
private readonly ephemeralExecutionEnqueuer: ExecutionEnqueuer<RunNowResult>;
private readonly bulkExecutionEnqueuer: BulkExecutionEnqueuer<void>;
private readonly auditLogger?: AuditLogger;
private readonly usageCounter?: UsageCounter;
private readonly connectorTokenClient: ConnectorTokenClientContract;
@ -132,6 +142,7 @@ export class ActionsClient {
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
bulkExecutionEnqueuer,
request,
authorization,
auditLogger,
@ -147,6 +158,7 @@ export class ActionsClient {
this.actionExecutor = actionExecutor;
this.executionEnqueuer = executionEnqueuer;
this.ephemeralExecutionEnqueuer = ephemeralExecutionEnqueuer;
this.bulkExecutionEnqueuer = bulkExecutionEnqueuer;
this.request = request;
this.authorization = authorization;
this.auditLogger = auditLogger;
@ -656,6 +668,30 @@ export class ActionsClient {
return this.executionEnqueuer(this.unsecuredSavedObjectsClient, options);
}
public async bulkEnqueueExecution(options: EnqueueExecutionOptions[]): Promise<void> {
const sources: Array<ActionExecutionSource<unknown>> = [];
options.forEach((option) => {
if (option.source) {
sources.push(option.source);
}
});
const authCounts = await getBulkAuthorizationModeBySource(
this.unsecuredSavedObjectsClient,
sources
);
if (authCounts[AuthorizationMode.RBAC] > 0) {
await this.authorization.ensureAuthorized('execute');
}
if (authCounts[AuthorizationMode.Legacy] > 0) {
trackLegacyRBACExemption(
'bulkEnqueueExecution',
this.usageCounter,
authCounts[AuthorizationMode.Legacy]
);
}
return this.bulkExecutionEnqueuer(this.unsecuredSavedObjectsClient, options);
}
public async ephemeralEnqueuedExecution(options: EnqueueExecutionOptions): Promise<RunNowResult> {
const { source } = options;
if (

View file

@ -7,6 +7,7 @@
import {
getAuthorizationModeBySource,
getBulkAuthorizationModeBySource,
AuthorizationMode,
} from './get_authorization_mode_by_source';
import { savedObjectsClientMock } from '@kbn/core/server/mocks';
@ -95,6 +96,90 @@ describe(`#getAuthorizationModeBySource`, () => {
});
});
describe(`#getBulkAuthorizationModeBySource`, () => {
test('should return RBAC if no source is provided', async () => {
unsecuredSavedObjectsClient.bulkGet.mockResolvedValue({ saved_objects: [] });
expect(await getBulkAuthorizationModeBySource(unsecuredSavedObjectsClient)).toEqual({
[AuthorizationMode.RBAC]: 1,
[AuthorizationMode.Legacy]: 0,
});
});
test('should return RBAC if source is not an alert', async () => {
unsecuredSavedObjectsClient.bulkGet.mockResolvedValue({ saved_objects: [] });
expect(
await getBulkAuthorizationModeBySource(unsecuredSavedObjectsClient, [
asSavedObjectExecutionSource({
type: 'action',
id: uuid.v4(),
}),
])
).toEqual({ [AuthorizationMode.RBAC]: 1, [AuthorizationMode.Legacy]: 0 });
});
test('should return RBAC if source alert is not marked as legacy', async () => {
const id = uuid.v4();
unsecuredSavedObjectsClient.bulkGet.mockResolvedValue({ saved_objects: [mockAlert({ id })] });
expect(
await getBulkAuthorizationModeBySource(unsecuredSavedObjectsClient, [
asSavedObjectExecutionSource({
type: 'alert',
id,
}),
])
).toEqual({ [AuthorizationMode.RBAC]: 1, [AuthorizationMode.Legacy]: 0 });
});
test('should return Legacy if source alert is marked as legacy', async () => {
const id = uuid.v4();
unsecuredSavedObjectsClient.bulkGet.mockResolvedValue({
saved_objects: [
mockAlert({ id, attributes: { meta: { versionApiKeyLastmodified: 'pre-7.10.0' } } }),
],
});
expect(
await getBulkAuthorizationModeBySource(unsecuredSavedObjectsClient, [
asSavedObjectExecutionSource({
type: 'alert',
id,
}),
])
).toEqual({ [AuthorizationMode.RBAC]: 0, [AuthorizationMode.Legacy]: 1 });
});
test('should return RBAC if source alert is marked as modern', async () => {
const id = uuid.v4();
unsecuredSavedObjectsClient.bulkGet.mockResolvedValue({
saved_objects: [
mockAlert({ id, attributes: { meta: { versionApiKeyLastmodified: '7.10.0' } } }),
],
});
expect(
await getBulkAuthorizationModeBySource(unsecuredSavedObjectsClient, [
asSavedObjectExecutionSource({
type: 'alert',
id,
}),
])
).toEqual({ [AuthorizationMode.RBAC]: 1, [AuthorizationMode.Legacy]: 0 });
});
test('should return RBAC if source alert doesnt have a last modified version', async () => {
const id = uuid.v4();
unsecuredSavedObjectsClient.bulkGet.mockResolvedValue({
saved_objects: [mockAlert({ id, attributes: { meta: {} } })],
});
expect(
await getBulkAuthorizationModeBySource(unsecuredSavedObjectsClient, [
asSavedObjectExecutionSource({
type: 'alert',
id,
}),
])
).toEqual({ [AuthorizationMode.RBAC]: 1, [AuthorizationMode.Legacy]: 0 });
});
});
const mockAlert = (overrides: Record<string, unknown> = {}) => ({
id: '1',
type: 'alert',

View file

@ -6,6 +6,7 @@
*/
import { SavedObjectsClientContract } from '@kbn/core/server';
import { get } from 'lodash';
import { ActionExecutionSource, isSavedObjectExecutionSource } from '../lib';
import { ALERT_SAVED_OBJECT_TYPE } from '../constants/saved_objects';
@ -32,3 +33,40 @@ export async function getAuthorizationModeBySource(
? AuthorizationMode.Legacy
: AuthorizationMode.RBAC;
}
export async function getBulkAuthorizationModeBySource(
unsecuredSavedObjectsClient: SavedObjectsClientContract,
executionSources: Array<ActionExecutionSource<unknown>> = []
): Promise<Record<string, number>> {
const count = { [AuthorizationMode.Legacy]: 0, [AuthorizationMode.RBAC]: 0 };
if (executionSources.length === 0) {
count[AuthorizationMode.RBAC] = 1;
return count;
}
const alerts = await unsecuredSavedObjectsClient.bulkGet<{
meta?: {
versionApiKeyLastmodified?: string;
};
}>(
executionSources.map((es) => ({
type: ALERT_SAVED_OBJECT_TYPE,
id: get(es, 'source.id'),
}))
);
const legacyVersions: Record<string, boolean> = alerts.saved_objects.reduce(
(acc, so) => ({
...acc,
[so.id]: so.attributes.meta?.versionApiKeyLastmodified === LEGACY_VERSION,
}),
{}
);
return executionSources.reduce((acc, es) => {
const isAlertSavedObject =
isSavedObjectExecutionSource(es) && es.source?.type === ALERT_SAVED_OBJECT_TYPE;
const isLegacyVersion = legacyVersions[get(es, 'source.id')];
const key =
isAlertSavedObject && isLegacyVersion ? AuthorizationMode.Legacy : AuthorizationMode.RBAC;
acc[key]++;
return acc;
}, count);
}

View file

@ -8,7 +8,10 @@
import { KibanaRequest } from '@kbn/core/server';
import uuid from 'uuid';
import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks';
import { createExecutionEnqueuerFunction } from './create_execute_function';
import {
createExecutionEnqueuerFunction,
createBulkExecutionEnqueuerFunction,
} from './create_execute_function';
import { savedObjectsClientMock } from '@kbn/core/server/mocks';
import { actionTypeRegistryMock } from './action_type_registry.mock';
import {
@ -539,3 +542,614 @@ describe('execute()', () => {
expect(mockedActionTypeRegistry.ensureActionTypeEnabled).not.toHaveBeenCalled();
});
});
describe('bulkExecute()', () => {
test('schedules the action with all given parameters', async () => {
const actionTypeRegistry = actionTypeRegistryMock.create();
const executeFn = createBulkExecutionEnqueuerFunction({
taskManager: mockTaskManager,
actionTypeRegistry,
isESOCanEncrypt: true,
preconfiguredActions: [],
});
savedObjectsClient.bulkGet.mockResolvedValueOnce({
saved_objects: [
{
id: '123',
type: 'action',
attributes: {
actionTypeId: 'mock-action',
},
references: [],
},
],
});
savedObjectsClient.bulkCreate.mockResolvedValueOnce({
saved_objects: [
{
id: '234',
type: 'action_task_params',
attributes: {
actionId: '123',
},
references: [],
},
],
});
await executeFn(savedObjectsClient, [
{
id: '123',
params: { baz: false },
spaceId: 'default',
executionId: '123abc',
apiKey: Buffer.from('123:abc').toString('base64'),
source: asHttpRequestExecutionSource(request),
},
]);
expect(mockTaskManager.bulkSchedule).toHaveBeenCalledTimes(1);
expect(mockTaskManager.bulkSchedule.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Array [
Object {
"params": Object {
"actionTaskParamsId": "234",
"spaceId": "default",
},
"scope": Array [
"actions",
],
"state": Object {},
"taskType": "actions:mock-action",
},
],
]
`);
expect(savedObjectsClient.bulkGet).toHaveBeenCalledWith([{ id: '123', type: 'action' }]);
expect(savedObjectsClient.bulkCreate).toHaveBeenCalledWith([
{
type: 'action_task_params',
attributes: {
actionId: '123',
params: { baz: false },
executionId: '123abc',
apiKey: Buffer.from('123:abc').toString('base64'),
},
references: [
{
id: '123',
name: 'actionRef',
type: 'action',
},
],
},
]);
expect(actionTypeRegistry.isActionExecutable).toHaveBeenCalledWith('123', 'mock-action', {
notifyUsage: true,
});
});
test('schedules the action with all given parameters and consumer', async () => {
const actionTypeRegistry = actionTypeRegistryMock.create();
const executeFn = createBulkExecutionEnqueuerFunction({
taskManager: mockTaskManager,
actionTypeRegistry,
isESOCanEncrypt: true,
preconfiguredActions: [],
});
savedObjectsClient.bulkGet.mockResolvedValueOnce({
saved_objects: [
{
id: '123',
type: 'action',
attributes: {
actionTypeId: 'mock-action',
},
references: [],
},
],
});
savedObjectsClient.bulkCreate.mockResolvedValueOnce({
saved_objects: [
{
id: '234',
type: 'action_task_params',
attributes: {
actionId: '123',
consumer: 'test-consumer',
},
references: [],
},
],
});
await executeFn(savedObjectsClient, [
{
id: '123',
params: { baz: false },
spaceId: 'default',
executionId: '123abc',
consumer: 'test-consumer',
apiKey: Buffer.from('123:abc').toString('base64'),
source: asHttpRequestExecutionSource(request),
},
]);
expect(mockTaskManager.bulkSchedule).toHaveBeenCalledTimes(1);
expect(mockTaskManager.bulkSchedule.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Array [
Object {
"params": Object {
"actionTaskParamsId": "234",
"spaceId": "default",
},
"scope": Array [
"actions",
],
"state": Object {},
"taskType": "actions:mock-action",
},
],
]
`);
expect(savedObjectsClient.bulkGet).toHaveBeenCalledWith([{ id: '123', type: 'action' }]);
expect(savedObjectsClient.bulkCreate).toHaveBeenCalledWith([
{
type: 'action_task_params',
attributes: {
actionId: '123',
params: { baz: false },
executionId: '123abc',
consumer: 'test-consumer',
apiKey: Buffer.from('123:abc').toString('base64'),
},
references: [
{
id: '123',
name: 'actionRef',
type: 'action',
},
],
},
]);
expect(actionTypeRegistry.isActionExecutable).toHaveBeenCalledWith('123', 'mock-action', {
notifyUsage: true,
});
});
test('schedules the action with all given parameters and relatedSavedObjects', async () => {
const actionTypeRegistry = actionTypeRegistryMock.create();
const executeFn = createBulkExecutionEnqueuerFunction({
taskManager: mockTaskManager,
actionTypeRegistry,
isESOCanEncrypt: true,
preconfiguredActions: [],
});
savedObjectsClient.bulkGet.mockResolvedValueOnce({
saved_objects: [
{
id: '123',
type: 'action',
attributes: {
actionTypeId: 'mock-action',
},
references: [],
},
],
});
savedObjectsClient.bulkCreate.mockResolvedValueOnce({
saved_objects: [
{
id: '234',
type: 'action_task_params',
attributes: {},
references: [],
},
],
});
await executeFn(savedObjectsClient, [
{
id: '123',
params: { baz: false },
spaceId: 'default',
apiKey: Buffer.from('123:abc').toString('base64'),
source: asHttpRequestExecutionSource(request),
executionId: '123abc',
relatedSavedObjects: [
{
id: 'some-id',
namespace: 'some-namespace',
type: 'some-type',
typeId: 'some-typeId',
},
],
},
]);
expect(savedObjectsClient.bulkCreate).toHaveBeenCalledWith([
{
type: 'action_task_params',
attributes: {
actionId: '123',
params: { baz: false },
apiKey: Buffer.from('123:abc').toString('base64'),
executionId: '123abc',
relatedSavedObjects: [
{
id: 'related_some-type_0',
namespace: 'some-namespace',
type: 'some-type',
typeId: 'some-typeId',
},
],
},
references: [
{
id: '123',
name: 'actionRef',
type: 'action',
},
{
id: 'some-id',
name: 'related_some-type_0',
type: 'some-type',
},
],
},
]);
});
test('schedules the action with all given parameters with a preconfigured action', async () => {
const executeFn = createBulkExecutionEnqueuerFunction({
taskManager: mockTaskManager,
actionTypeRegistry: actionTypeRegistryMock.create(),
isESOCanEncrypt: true,
preconfiguredActions: [
{
id: '123',
actionTypeId: 'mock-action-preconfigured',
config: {},
isPreconfigured: true,
isDeprecated: false,
name: 'x',
secrets: {},
},
],
});
const source = { type: 'alert', id: uuid.v4() };
savedObjectsClient.bulkGet.mockResolvedValueOnce({
saved_objects: [
{
id: '123',
type: 'action',
attributes: {
actionTypeId: 'mock-action',
},
references: [],
},
],
});
savedObjectsClient.bulkCreate.mockResolvedValueOnce({
saved_objects: [
{
id: '234',
type: 'action_task_params',
attributes: {
actionId: '123',
},
references: [],
},
],
});
await executeFn(savedObjectsClient, [
{
id: '123',
params: { baz: false },
spaceId: 'default',
executionId: '123abc',
apiKey: Buffer.from('123:abc').toString('base64'),
source: asSavedObjectExecutionSource(source),
},
]);
expect(mockTaskManager.bulkSchedule).toHaveBeenCalledTimes(1);
expect(mockTaskManager.bulkSchedule.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Array [
Object {
"params": Object {
"actionTaskParamsId": "234",
"spaceId": "default",
},
"scope": Array [
"actions",
],
"state": Object {},
"taskType": "actions:mock-action-preconfigured",
},
],
]
`);
expect(savedObjectsClient.get).not.toHaveBeenCalled();
expect(savedObjectsClient.bulkCreate).toHaveBeenCalledWith([
{
type: 'action_task_params',
attributes: {
actionId: '123',
params: { baz: false },
executionId: '123abc',
apiKey: Buffer.from('123:abc').toString('base64'),
},
references: [
{
id: source.id,
name: 'source',
type: source.type,
},
],
},
]);
});
test('schedules the action with all given parameters with a preconfigured action and relatedSavedObjects', async () => {
const executeFn = createBulkExecutionEnqueuerFunction({
taskManager: mockTaskManager,
actionTypeRegistry: actionTypeRegistryMock.create(),
isESOCanEncrypt: true,
preconfiguredActions: [
{
id: '123',
actionTypeId: 'mock-action-preconfigured',
config: {},
isPreconfigured: true,
isDeprecated: false,
name: 'x',
secrets: {},
},
],
});
const source = { type: 'alert', id: uuid.v4() };
savedObjectsClient.bulkGet.mockResolvedValueOnce({
saved_objects: [
{
id: '123',
type: 'action',
attributes: {
actionTypeId: 'mock-action',
},
references: [],
},
],
});
savedObjectsClient.bulkCreate.mockResolvedValueOnce({
saved_objects: [
{
id: '234',
type: 'action_task_params',
attributes: {
actionId: '123',
},
references: [],
},
],
});
await executeFn(savedObjectsClient, [
{
id: '123',
params: { baz: false },
spaceId: 'default',
apiKey: Buffer.from('123:abc').toString('base64'),
source: asSavedObjectExecutionSource(source),
executionId: '123abc',
relatedSavedObjects: [
{
id: 'some-id',
namespace: 'some-namespace',
type: 'some-type',
typeId: 'some-typeId',
},
],
},
]);
expect(mockTaskManager.bulkSchedule).toHaveBeenCalledTimes(1);
expect(mockTaskManager.bulkSchedule.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Array [
Object {
"params": Object {
"actionTaskParamsId": "234",
"spaceId": "default",
},
"scope": Array [
"actions",
],
"state": Object {},
"taskType": "actions:mock-action-preconfigured",
},
],
]
`);
expect(savedObjectsClient.get).not.toHaveBeenCalled();
expect(savedObjectsClient.bulkCreate).toHaveBeenCalledWith([
{
type: 'action_task_params',
attributes: {
actionId: '123',
params: { baz: false },
apiKey: Buffer.from('123:abc').toString('base64'),
executionId: '123abc',
relatedSavedObjects: [
{
id: 'related_some-type_0',
namespace: 'some-namespace',
type: 'some-type',
typeId: 'some-typeId',
},
],
},
references: [
{
id: source.id,
name: 'source',
type: source.type,
},
{
id: 'some-id',
name: 'related_some-type_0',
type: 'some-type',
},
],
},
]);
});
test('throws when passing isESOCanEncrypt with false as a value', async () => {
const executeFn = createBulkExecutionEnqueuerFunction({
taskManager: mockTaskManager,
isESOCanEncrypt: false,
actionTypeRegistry: actionTypeRegistryMock.create(),
preconfiguredActions: [],
});
await expect(
executeFn(savedObjectsClient, [
{
id: '123',
params: { baz: false },
spaceId: 'default',
executionId: '123abc',
apiKey: null,
},
])
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Unable to execute actions because the Encrypted Saved Objects plugin is missing encryption key. Please set xpack.encryptedSavedObjects.encryptionKey in the kibana.yml or use the bin/kibana-encryption-keys command."`
);
});
test('throws when isMissingSecrets is true for connector', async () => {
const executeFn = createBulkExecutionEnqueuerFunction({
taskManager: mockTaskManager,
isESOCanEncrypt: true,
actionTypeRegistry: actionTypeRegistryMock.create(),
preconfiguredActions: [],
});
savedObjectsClient.bulkGet.mockResolvedValueOnce({
saved_objects: [
{
id: '123',
type: 'action',
attributes: {
name: 'mock-action',
isMissingSecrets: true,
actionTypeId: 'mock-action',
},
references: [],
},
],
});
await expect(
executeFn(savedObjectsClient, [
{
id: '123',
params: { baz: false },
spaceId: 'default',
executionId: '123abc',
apiKey: null,
},
])
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Unable to execute action because no secrets are defined for the \\"mock-action\\" connector."`
);
});
test('should ensure action type is enabled', async () => {
const mockedActionTypeRegistry = actionTypeRegistryMock.create();
const executeFn = createBulkExecutionEnqueuerFunction({
taskManager: mockTaskManager,
isESOCanEncrypt: true,
actionTypeRegistry: mockedActionTypeRegistry,
preconfiguredActions: [],
});
mockedActionTypeRegistry.ensureActionTypeEnabled.mockImplementation(() => {
throw new Error('Fail');
});
savedObjectsClient.bulkGet.mockResolvedValueOnce({
saved_objects: [
{
id: '123',
type: 'action',
attributes: {
actionTypeId: 'mock-action',
},
references: [],
},
],
});
await expect(
executeFn(savedObjectsClient, [
{
id: '123',
params: { baz: false },
spaceId: 'default',
executionId: '123abc',
apiKey: null,
},
])
).rejects.toThrowErrorMatchingInlineSnapshot(`"Fail"`);
});
test('should skip ensure action type if action type is preconfigured and license is valid', async () => {
const mockedActionTypeRegistry = actionTypeRegistryMock.create();
const executeFn = createBulkExecutionEnqueuerFunction({
taskManager: mockTaskManager,
isESOCanEncrypt: true,
actionTypeRegistry: mockedActionTypeRegistry,
preconfiguredActions: [
{
actionTypeId: 'mock-action',
config: {},
id: 'my-slack1',
name: 'Slack #xyz',
secrets: {},
isPreconfigured: true,
isDeprecated: false,
},
],
});
mockedActionTypeRegistry.isActionExecutable.mockImplementation(() => true);
savedObjectsClient.bulkGet.mockResolvedValueOnce({
saved_objects: [
{
id: '123',
type: 'action',
attributes: {
actionTypeId: 'mock-action',
},
references: [],
},
],
});
savedObjectsClient.bulkCreate.mockResolvedValueOnce({
saved_objects: [
{
id: '234',
type: 'action_task_params',
attributes: {
actionId: '123',
},
references: [],
},
],
});
await executeFn(savedObjectsClient, [
{
id: '123',
params: { baz: false },
spaceId: 'default',
executionId: '123abc',
apiKey: null,
},
]);
expect(mockedActionTypeRegistry.ensureActionTypeEnabled).not.toHaveBeenCalled();
});
});

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { SavedObjectsClientContract } from '@kbn/core/server';
import { SavedObjectsBulkResponse, SavedObjectsClientContract } from '@kbn/core/server';
import { RunNowResult, TaskManagerStartContract } from '@kbn/task-manager-plugin/server';
import {
RawAction,
@ -34,11 +34,30 @@ export interface ExecuteOptions extends Pick<ActionExecutorOptions, 'params' | '
relatedSavedObjects?: RelatedSavedObjects;
}
export interface ActionTaskParams extends Pick<ActionExecutorOptions, 'params'> {
actionId: string;
apiKey: string | null;
executionId: string;
consumer?: string;
relatedSavedObjects?: RelatedSavedObjects;
}
export interface GetConnectorsResult {
connector: PreConfiguredAction | RawAction;
isPreconfigured: boolean;
id: string;
}
export type ExecutionEnqueuer<T> = (
unsecuredSavedObjectsClient: SavedObjectsClientContract,
options: ExecuteOptions
) => Promise<T>;
export type BulkExecutionEnqueuer<T> = (
unsecuredSavedObjectsClient: SavedObjectsClientContract,
actionsToExectute: ExecuteOptions[]
) => Promise<T>;
export function createExecutionEnqueuerFunction({
taskManager,
actionTypeRegistry,
@ -119,6 +138,99 @@ export function createExecutionEnqueuerFunction({
};
}
export function createBulkExecutionEnqueuerFunction({
taskManager,
actionTypeRegistry,
isESOCanEncrypt,
preconfiguredActions,
}: CreateExecuteFunctionOptions): BulkExecutionEnqueuer<void> {
return async function execute(
unsecuredSavedObjectsClient: SavedObjectsClientContract,
actionsToExecute: ExecuteOptions[]
) {
if (!isESOCanEncrypt) {
throw new Error(
`Unable to execute actions because the Encrypted Saved Objects plugin is missing encryption key. Please set xpack.encryptedSavedObjects.encryptionKey in the kibana.yml or use the bin/kibana-encryption-keys command.`
);
}
const actionTypeIds: Record<string, string> = {};
const spaceIds: Record<string, string> = {};
const connectorIsPreconfigured: Record<string, boolean> = {};
const connectorIds = [...new Set(actionsToExecute.map((action) => action.id))];
const connectors = await getConnectors(
unsecuredSavedObjectsClient,
preconfiguredActions,
connectorIds
);
connectors.forEach((c) => {
const { id, connector, isPreconfigured } = c;
validateCanActionBeUsed(connector);
const { actionTypeId } = connector;
if (!actionTypeRegistry.isActionExecutable(id, actionTypeId, { notifyUsage: true })) {
actionTypeRegistry.ensureActionTypeEnabled(actionTypeId);
}
actionTypeIds[id] = actionTypeId;
connectorIsPreconfigured[id] = isPreconfigured;
});
const actions = await Promise.all(
actionsToExecute.map(async (actionToExecute) => {
// Get saved object references from action ID and relatedSavedObjects
const { references, relatedSavedObjectWithRefs } = extractSavedObjectReferences(
actionToExecute.id,
connectorIsPreconfigured[actionToExecute.id],
actionToExecute.relatedSavedObjects
);
const executionSourceReference = executionSourceAsSavedObjectReferences(
actionToExecute.source
);
const taskReferences = [];
if (executionSourceReference.references) {
taskReferences.push(...executionSourceReference.references);
}
if (references) {
taskReferences.push(...references);
}
spaceIds[actionToExecute.id] = actionToExecute.spaceId;
return {
type: ACTION_TASK_PARAMS_SAVED_OBJECT_TYPE,
attributes: {
actionId: actionToExecute.id,
params: actionToExecute.params,
apiKey: actionToExecute.apiKey,
executionId: actionToExecute.executionId,
consumer: actionToExecute.consumer,
relatedSavedObjects: relatedSavedObjectWithRefs,
},
references: taskReferences,
};
})
);
const actionTaskParamsRecords: SavedObjectsBulkResponse<ActionTaskParams> =
await unsecuredSavedObjectsClient.bulkCreate(actions);
const taskInstances = actionTaskParamsRecords.saved_objects.map((so) => {
const actionId = so.attributes.actionId;
return {
taskType: `actions:${actionTypeIds[actionId]}`,
params: {
spaceId: spaceIds[actionId],
actionTaskParamsId: so.id,
},
state: {},
scope: ['actions'],
};
});
await taskManager.bulkSchedule(taskInstances);
};
}
export function createEphemeralExecutionEnqueuerFunction({
taskManager,
actionTypeRegistry,
@ -194,3 +306,41 @@ async function getAction(
const { attributes } = await unsecuredSavedObjectsClient.get<RawAction>('action', actionId);
return { action: attributes, isPreconfigured: false };
}
async function getConnectors(
unsecuredSavedObjectsClient: SavedObjectsClientContract,
preconfiguredConnectors: PreConfiguredAction[],
connectorIds: string[]
): Promise<GetConnectorsResult[]> {
const result: GetConnectorsResult[] = [];
const connectorIdsToFetch = [];
for (const connectorId of connectorIds) {
const pcConnector = preconfiguredConnectors.find((connector) => connector.id === connectorId);
if (pcConnector) {
result.push({ connector: pcConnector, isPreconfigured: true, id: connectorId });
} else {
connectorIdsToFetch.push(connectorId);
}
}
if (connectorIdsToFetch.length > 0) {
const bulkGetResult = await unsecuredSavedObjectsClient.bulkGet<RawAction>(
connectorIdsToFetch.map((id) => ({
id,
type: 'action',
}))
);
for (const item of bulkGetResult.saved_objects) {
if (item.error) throw item.error;
result.push({
isPreconfigured: false,
connector: item.attributes,
id: item.id,
});
}
}
return result;
}

View file

@ -29,4 +29,16 @@ describe('trackLegacyRBACExemption', () => {
}
expect(err).toBeUndefined();
});
it('should call `usageCounter.incrementCounter` and increment by the passed in value', () => {
const mockUsageCountersSetup = usageCountersServiceMock.createSetupContract();
const mockUsageCounter = mockUsageCountersSetup.createUsageCounter('test');
trackLegacyRBACExemption('test', mockUsageCounter, 15);
expect(mockUsageCounter.incrementCounter).toHaveBeenCalledWith({
counterName: `source_test`,
counterType: 'legacyRBACExemption',
incrementBy: 15,
});
});
});

View file

@ -7,12 +7,16 @@
import { UsageCounter } from '@kbn/usage-collection-plugin/server';
export function trackLegacyRBACExemption(source: string, usageCounter?: UsageCounter) {
export function trackLegacyRBACExemption(
source: string,
usageCounter?: UsageCounter,
increment?: number
) {
if (usageCounter) {
usageCounter.incrementCounter({
counterName: `source_${source}`,
counterType: 'legacyRBACExemption',
incrementBy: 1,
incrementBy: increment ? increment : 1,
});
}
}

View file

@ -47,6 +47,7 @@ import { ActionTypeRegistry } from './action_type_registry';
import {
createExecutionEnqueuerFunction,
createEphemeralExecutionEnqueuerFunction,
createBulkExecutionEnqueuerFunction,
} from './create_execute_function';
import { registerBuiltInActionTypes } from './builtin_action_types';
import { registerActionsUsageCollector } from './usage';
@ -437,6 +438,12 @@ export class ActionsPlugin implements Plugin<PluginSetupContract, PluginStartCon
isESOCanEncrypt: isESOCanEncrypt!,
preconfiguredActions,
}),
bulkExecutionEnqueuer: createBulkExecutionEnqueuerFunction({
taskManager: plugins.taskManager,
actionTypeRegistry: actionTypeRegistry!,
isESOCanEncrypt: isESOCanEncrypt!,
preconfiguredActions,
}),
auditLogger: this.security?.audit.asScoped(request),
usageCounter: this.usageCounter,
connectorTokenClient: new ConnectorTokenClient({
@ -626,6 +633,12 @@ export class ActionsPlugin implements Plugin<PluginSetupContract, PluginStartCon
isESOCanEncrypt: isESOCanEncrypt!,
preconfiguredActions,
}),
bulkExecutionEnqueuer: createBulkExecutionEnqueuerFunction({
taskManager,
actionTypeRegistry: actionTypeRegistry!,
isESOCanEncrypt: isESOCanEncrypt!,
preconfiguredActions,
}),
auditLogger: security?.audit.asScoped(request),
usageCounter,
connectorTokenClient: new ConnectorTokenClient({

View file

@ -145,37 +145,39 @@ describe('Create Execution Handler', () => {
expect(mockActionsPlugin.getActionsClientWithRequest).toHaveBeenCalledWith(
createExecutionHandlerParams.request
);
expect(actionsClient.enqueueExecution).toHaveBeenCalledTimes(1);
expect(actionsClient.enqueueExecution.mock.calls[0]).toMatchInlineSnapshot(`
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(1);
expect(actionsClient.bulkEnqueueExecution.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Object {
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"alertVal": "My 1 name-of-alert test1 tag-A,tag-B 2 goes here",
"contextVal": "My goes here",
"foo": true,
"stateVal": "My goes here",
},
"relatedSavedObjects": Array [
Object {
"id": "1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
Array [
Object {
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"alertVal": "My 1 name-of-alert test1 tag-A,tag-B 2 goes here",
"contextVal": "My goes here",
"foo": true,
"stateVal": "My goes here",
},
],
"source": Object {
"relatedSavedObjects": Array [
Object {
"id": "1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
"source": Object {
"id": "1",
"type": "alert",
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"type": "SAVED_OBJECT",
"spaceId": "test1",
},
"spaceId": "test1",
},
],
]
`);
@ -241,31 +243,33 @@ describe('Create Execution Handler', () => {
});
expect(ruleRunMetricsStore.getNumberOfTriggeredActions()).toBe(1);
expect(ruleRunMetricsStore.getNumberOfGeneratedActions()).toBe(2);
expect(actionsClient.enqueueExecution).toHaveBeenCalledTimes(1);
expect(actionsClient.enqueueExecution).toHaveBeenCalledWith({
consumer: 'rule-consumer',
id: '2',
params: {
foo: true,
contextVal: 'My other goes here',
stateVal: 'My other goes here',
},
source: asSavedObjectExecutionSource({
id: '1',
type: 'alert',
}),
relatedSavedObjects: [
{
id: '1',
namespace: 'test1',
type: 'alert',
typeId: 'test',
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(1);
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledWith([
{
consumer: 'rule-consumer',
id: '2',
params: {
foo: true,
contextVal: 'My other goes here',
stateVal: 'My other goes here',
},
],
spaceId: 'test1',
apiKey: createExecutionHandlerParams.apiKey,
executionId: '5f6aa57d-3e22-484e-bae8-cbed868f4d28',
});
source: asSavedObjectExecutionSource({
id: '1',
type: 'alert',
}),
relatedSavedObjects: [
{
id: '1',
namespace: 'test1',
type: 'alert',
typeId: 'test',
},
],
spaceId: 'test1',
apiKey: createExecutionHandlerParams.apiKey,
executionId: '5f6aa57d-3e22-484e-bae8-cbed868f4d28',
},
]);
});
test('trow error error message when action type is disabled', async () => {
@ -305,7 +309,7 @@ describe('Create Execution Handler', () => {
});
expect(ruleRunMetricsStore.getNumberOfTriggeredActions()).toBe(0);
expect(ruleRunMetricsStore.getNumberOfGeneratedActions()).toBe(2);
expect(actionsClient.enqueueExecution).toHaveBeenCalledTimes(0);
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(0);
mockActionsPlugin.isActionExecutable.mockImplementation(() => true);
const executionHandlerForPreconfiguredAction = createExecutionHandler({
@ -319,7 +323,7 @@ describe('Create Execution Handler', () => {
alertId: '2',
ruleRunMetricsStore,
});
expect(actionsClient.enqueueExecution).toHaveBeenCalledTimes(1);
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(1);
});
test('limits actionsPlugin.execute per action group', async () => {
@ -333,7 +337,7 @@ describe('Create Execution Handler', () => {
});
expect(ruleRunMetricsStore.getNumberOfTriggeredActions()).toBe(0);
expect(ruleRunMetricsStore.getNumberOfGeneratedActions()).toBe(0);
expect(actionsClient.enqueueExecution).not.toHaveBeenCalled();
expect(actionsClient.bulkEnqueueExecution).not.toHaveBeenCalled();
});
test('context attribute gets parameterized', async () => {
@ -347,37 +351,39 @@ describe('Create Execution Handler', () => {
});
expect(ruleRunMetricsStore.getNumberOfTriggeredActions()).toBe(1);
expect(ruleRunMetricsStore.getNumberOfGeneratedActions()).toBe(1);
expect(actionsClient.enqueueExecution).toHaveBeenCalledTimes(1);
expect(actionsClient.enqueueExecution.mock.calls[0]).toMatchInlineSnapshot(`
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(1);
expect(actionsClient.bulkEnqueueExecution.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Object {
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"alertVal": "My 1 name-of-alert test1 tag-A,tag-B 2 goes here",
"contextVal": "My context-val goes here",
"foo": true,
"stateVal": "My goes here",
},
"relatedSavedObjects": Array [
Object {
"id": "1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
Array [
Object {
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"alertVal": "My 1 name-of-alert test1 tag-A,tag-B 2 goes here",
"contextVal": "My context-val goes here",
"foo": true,
"stateVal": "My goes here",
},
],
"source": Object {
"relatedSavedObjects": Array [
Object {
"id": "1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
"source": Object {
"id": "1",
"type": "alert",
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"type": "SAVED_OBJECT",
"spaceId": "test1",
},
"spaceId": "test1",
},
],
]
`);
});
@ -391,37 +397,39 @@ describe('Create Execution Handler', () => {
alertId: '2',
ruleRunMetricsStore,
});
expect(actionsClient.enqueueExecution).toHaveBeenCalledTimes(1);
expect(actionsClient.enqueueExecution.mock.calls[0]).toMatchInlineSnapshot(`
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(1);
expect(actionsClient.bulkEnqueueExecution.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Object {
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"alertVal": "My 1 name-of-alert test1 tag-A,tag-B 2 goes here",
"contextVal": "My goes here",
"foo": true,
"stateVal": "My state-val goes here",
},
"relatedSavedObjects": Array [
Object {
"id": "1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
Array [
Object {
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"alertVal": "My 1 name-of-alert test1 tag-A,tag-B 2 goes here",
"contextVal": "My goes here",
"foo": true,
"stateVal": "My state-val goes here",
},
],
"source": Object {
"relatedSavedObjects": Array [
Object {
"id": "1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
"source": Object {
"id": "1",
"type": "alert",
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"type": "SAVED_OBJECT",
"spaceId": "test1",
},
"spaceId": "test1",
},
],
]
`);
});
@ -502,7 +510,7 @@ describe('Create Execution Handler', () => {
expect(ruleRunMetricsStore.getNumberOfGeneratedActions()).toBe(3);
expect(ruleRunMetricsStore.getTriggeredActionsStatus()).toBe(ActionsCompletion.PARTIAL);
expect(createExecutionHandlerParams.logger.debug).toHaveBeenCalledTimes(1);
expect(actionsClient.enqueueExecution).toHaveBeenCalledTimes(2);
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(1);
});
test('Skips triggering actions for a specific action type when it reaches the limit for that specific action type', async () => {
@ -582,6 +590,6 @@ describe('Create Execution Handler', () => {
.numberOfTriggeredActions
).toBe(2);
expect(ruleRunMetricsStore.getTriggeredActionsStatus()).toBe(ActionsCompletion.PARTIAL);
expect(actionsClient.enqueueExecution).toHaveBeenCalledTimes(4);
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(1);
});
});

View file

@ -6,6 +6,7 @@
*/
import { asSavedObjectExecutionSource } from '@kbn/actions-plugin/server';
import { isEphemeralTaskRejectedDueToCapacityError } from '@kbn/task-manager-plugin/server';
import { chunk } from 'lodash';
import { transformActionParams } from './transform_action_params';
import { injectActionParams } from './inject_action_params';
import {
@ -60,6 +61,7 @@ export function createExecutionHandler<
const ruleTypeActionGroups = new Map(
ruleType.actionGroups.map((actionGroup) => [actionGroup.id, actionGroup.name])
);
const CHUNK_SIZE = 1000;
return async ({
actionGroup,
@ -115,6 +117,8 @@ export function createExecutionHandler<
const actionsClient = await actionsPlugin.getActionsClientWithRequest(request);
let ephemeralActionsToSchedule = maxEphemeralActionsPerRule;
const bulkActions = [];
const logActions = [];
for (const action of actions) {
const { actionTypeId } = action;
@ -188,14 +192,13 @@ export function createExecutionHandler<
await actionsClient.ephemeralEnqueuedExecution(enqueueOptions);
} catch (err) {
if (isEphemeralTaskRejectedDueToCapacityError(err)) {
await actionsClient.enqueueExecution(enqueueOptions);
bulkActions.push(enqueueOptions);
}
}
} else {
await actionsClient.enqueueExecution(enqueueOptions);
bulkActions.push(enqueueOptions);
}
alertingEventLogger.logAction({
logActions.push({
id: action.id,
typeId: actionTypeId,
alertId,
@ -203,5 +206,13 @@ export function createExecutionHandler<
alertSubgroup: actionSubgroup,
});
}
for (const c of chunk(bulkActions, CHUNK_SIZE)) {
await actionsClient.bulkEnqueueExecution(c);
}
for (const action of logActions) {
alertingEventLogger.logAction(action);
}
};
}

View file

@ -245,31 +245,34 @@ export const generateRunnerResult = ({
};
};
export const generateEnqueueFunctionInput = () => ({
apiKey: 'MTIzOmFiYw==',
executionId: '5f6aa57d-3e22-484e-bae8-cbed868f4d28',
id: '1',
params: {
foo: true,
},
consumer: 'bar',
relatedSavedObjects: [
{
id: '1',
namespace: undefined,
type: 'alert',
typeId: RULE_TYPE_ID,
export const generateEnqueueFunctionInput = (isArray: boolean = false) => {
const input = {
apiKey: 'MTIzOmFiYw==',
executionId: '5f6aa57d-3e22-484e-bae8-cbed868f4d28',
id: '1',
params: {
foo: true,
},
],
source: {
consumer: 'bar',
relatedSavedObjects: [
{
id: '1',
namespace: undefined,
type: 'alert',
typeId: RULE_TYPE_ID,
},
],
source: {
id: '1',
type: 'alert',
source: {
id: '1',
type: 'alert',
},
type: 'SAVED_OBJECT',
},
type: 'SAVED_OBJECT',
},
spaceId: 'default',
});
spaceId: 'default',
};
return isArray ? [input] : input;
};
export const generateAlertInstance = ({ id, duration, start }: GeneratorParams = { id: 1 }) => ({
[String(id)]: {

View file

@ -16,11 +16,7 @@ import {
AlertInstanceContext,
RuleExecutionStatusWarningReasons,
} from '../types';
import {
ConcreteTaskInstance,
isUnrecoverableError,
RunNowResult,
} from '@kbn/task-manager-plugin/server';
import { ConcreteTaskInstance, isUnrecoverableError } from '@kbn/task-manager-plugin/server';
import { TaskRunnerContext } from './task_runner_factory';
import { TaskRunner } from './task_runner';
import { encryptedSavedObjectsMock } from '@kbn/encrypted-saved-objects-plugin/server/mocks';
@ -41,7 +37,6 @@ import { IEventLogger } from '@kbn/event-log-plugin/server';
import { SavedObjectsErrorHelpers } from '@kbn/core/server';
import { omit } from 'lodash';
import { ruleTypeRegistryMock } from '../rule_type_registry.mock';
import { ExecuteOptions } from '@kbn/actions-plugin/server/create_execute_function';
import { inMemoryMetricsMock } from '../monitoring/in_memory_metrics.mock';
import moment from 'moment';
import {
@ -130,8 +125,6 @@ describe('Task Runner', () => {
executionContext: ReturnType<typeof executionContextServiceMock.createInternalStartContract>;
};
type EnqueueFunction = (options: ExecuteOptions) => Promise<void | RunNowResult>;
const taskRunnerFactoryInitializerParams: TaskRunnerFactoryInitializerParamsType = {
data: dataPlugin,
savedObjects: savedObjectsService,
@ -163,10 +156,11 @@ describe('Task Runner', () => {
[
nameExtension: string,
customTaskRunnerFactoryInitializerParams: TaskRunnerFactoryInitializerParamsType,
enqueueFunction: EnqueueFunction
enqueueFunction: unknown,
inputIsArray: boolean
]
> = [
['', taskRunnerFactoryInitializerParams, actionsClient.enqueueExecution],
['', taskRunnerFactoryInitializerParams, actionsClient.bulkEnqueueExecution, true],
[
' (with ephemeral support)',
{
@ -174,6 +168,7 @@ describe('Task Runner', () => {
supportsEphemeralTasks: true,
},
actionsClient.ephemeralEnqueuedExecution,
false,
],
];
@ -294,7 +289,12 @@ describe('Task Runner', () => {
test.each(ephemeralTestParams)(
'actionsPlugin.execute is called per alert alert that is scheduled %s',
async (nameExtension, customTaskRunnerFactoryInitializerParams, enqueueFunction) => {
async (
nameExtension,
customTaskRunnerFactoryInitializerParams,
enqueueFunction,
inputIsArray
) => {
customTaskRunnerFactoryInitializerParams.actionsPlugin.isActionTypeEnabled.mockReturnValue(
true
);
@ -329,7 +329,7 @@ describe('Task Runner', () => {
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue(SAVED_OBJECT);
await taskRunner.run();
expect(enqueueFunction).toHaveBeenCalledTimes(1);
expect(enqueueFunction).toHaveBeenCalledWith(generateEnqueueFunctionInput());
expect(enqueueFunction).toHaveBeenCalledWith(generateEnqueueFunctionInput(inputIsArray));
expect(logger.debug).toHaveBeenCalledTimes(5);
expect(logger.debug).nthCalledWith(1, 'executing rule test:1 at 1970-01-01T00:00:00.000Z');
@ -536,7 +536,7 @@ describe('Task Runner', () => {
await taskRunner.run();
const expectedExecutions = shouldBeSnoozed ? 0 : 1;
expect(actionsClient.enqueueExecution).toHaveBeenCalledTimes(expectedExecutions);
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(expectedExecutions);
expect(actionsClient.ephemeralEnqueuedExecution).toHaveBeenCalledTimes(0);
const expectedMessage = `no scheduling of actions for rule test:1: '${RULE_NAME}': rule is snoozed.`;
@ -945,7 +945,12 @@ describe('Task Runner', () => {
test.each(ephemeralTestParams)(
'includes the apiKey in the request used to initialize the actionsClient %s',
async (nameExtension, customTaskRunnerFactoryInitializerParams, enqueueFunction) => {
async (
nameExtension,
customTaskRunnerFactoryInitializerParams,
enqueueFunction,
inputIsArray
) => {
customTaskRunnerFactoryInitializerParams.actionsPlugin.isActionTypeEnabled.mockReturnValue(
true
);
@ -998,7 +1003,7 @@ describe('Task Runner', () => {
);
expect(enqueueFunction).toHaveBeenCalledTimes(1);
expect(enqueueFunction).toHaveBeenCalledWith(generateEnqueueFunctionInput());
expect(enqueueFunction).toHaveBeenCalledWith(generateEnqueueFunctionInput(inputIsArray));
testAlertingEventLogCalls({
activeAlerts: 1,
@ -1033,7 +1038,12 @@ describe('Task Runner', () => {
test.each(ephemeralTestParams)(
'fire recovered actions for execution for the alertInstances which is in the recovered state %s',
async (nameExtension, customTaskRunnerFactoryInitializerParams, enqueueFunction) => {
async (
nameExtension,
customTaskRunnerFactoryInitializerParams,
enqueueFunction,
inputIsArray
) => {
customTaskRunnerFactoryInitializerParams.actionsPlugin.isActionTypeEnabled.mockReturnValue(
true
);
@ -1149,14 +1159,19 @@ describe('Task Runner', () => {
);
expect(enqueueFunction).toHaveBeenCalledTimes(2);
expect(enqueueFunction).toHaveBeenCalledWith(generateEnqueueFunctionInput());
expect(enqueueFunction).toHaveBeenCalledWith(generateEnqueueFunctionInput(inputIsArray));
expect(mockUsageCounter.incrementCounter).not.toHaveBeenCalled();
}
);
test.each(ephemeralTestParams)(
"should skip alertInstances which weren't active on the previous execution %s",
async (nameExtension, customTaskRunnerFactoryInitializerParams, enqueueFunction) => {
async (
nameExtension,
customTaskRunnerFactoryInitializerParams,
enqueueFunction,
inputIsArray
) => {
customTaskRunnerFactoryInitializerParams.actionsPlugin.isActionTypeEnabled.mockReturnValue(
true
);
@ -1231,15 +1246,25 @@ describe('Task Runner', () => {
});
expect(enqueueFunction).toHaveBeenCalledTimes(2);
expect((enqueueFunction as jest.Mock).mock.calls[1][0].id).toEqual('2');
expect((enqueueFunction as jest.Mock).mock.calls[0][0].id).toEqual('1');
if (inputIsArray) {
expect((enqueueFunction as jest.Mock).mock.calls[1][0][0].id).toEqual('2');
expect((enqueueFunction as jest.Mock).mock.calls[0][0][0].id).toEqual('1');
} else {
expect((enqueueFunction as jest.Mock).mock.calls[1][0].id).toEqual('2');
expect((enqueueFunction as jest.Mock).mock.calls[0][0].id).toEqual('1');
}
expect(mockUsageCounter.incrementCounter).not.toHaveBeenCalled();
}
);
test.each(ephemeralTestParams)(
'fire actions under a custom recovery group when specified on an alert type for alertInstances which are in the recovered state %s',
async (nameExtension, customTaskRunnerFactoryInitializerParams, enqueueFunction) => {
async (
nameExtension,
customTaskRunnerFactoryInitializerParams,
enqueueFunction,
inputIsArray
) => {
customTaskRunnerFactoryInitializerParams.actionsPlugin.isActionTypeEnabled.mockReturnValue(
true
);
@ -1329,7 +1354,7 @@ describe('Task Runner', () => {
});
expect(enqueueFunction).toHaveBeenCalledTimes(2);
expect(enqueueFunction).toHaveBeenCalledWith(generateEnqueueFunctionInput());
expect(enqueueFunction).toHaveBeenCalledWith(generateEnqueueFunctionInput(inputIsArray));
expect(mockUsageCounter.incrementCounter).not.toHaveBeenCalled();
}
);
@ -2560,7 +2585,7 @@ describe('Task Runner', () => {
const runnerResult = await taskRunner.run();
expect(actionsClient.enqueueExecution).toHaveBeenCalledTimes(actionsConfigMap.default.max);
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(1);
expect(
taskRunnerFactoryInitializerParams.internalSavedObjectsRepository.update
@ -2712,8 +2737,8 @@ describe('Task Runner', () => {
const runnerResult = await taskRunner.run();
// 1x(.server-log) and 2x(any-action) per alert
expect(actionsClient.enqueueExecution).toHaveBeenCalledTimes(5);
// 1x(.server-log) and 1x(any-action) per alert
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(2);
expect(
taskRunnerFactoryInitializerParams.internalSavedObjectsRepository.update
@ -2756,7 +2781,7 @@ describe('Task Runner', () => {
expect(logger.debug).nthCalledWith(
3,
'Rule "1" skipped scheduling action "1" because the maximum number of allowed actions for connector type .server-log has been reached.'
'Rule "1" skipped scheduling action "2" because the maximum number of allowed actions for connector type .server-log has been reached.'
);
testAlertingEventLogCalls({

View file

@ -29,6 +29,7 @@ const createStartMock = () => {
removeIfExists: jest.fn(),
supportsEphemeralTasks: jest.fn(),
bulkUpdateSchedules: jest.fn(),
bulkSchedule: jest.fn(),
};
return mock;
};

View file

@ -48,7 +48,12 @@ export interface TaskManagerSetupContract {
export type TaskManagerStartContract = Pick<
TaskScheduling,
'schedule' | 'runSoon' | 'ephemeralRunNow' | 'ensureScheduled' | 'bulkUpdateSchedules'
| 'schedule'
| 'runSoon'
| 'ephemeralRunNow'
| 'ensureScheduled'
| 'bulkUpdateSchedules'
| 'bulkSchedule'
> &
Pick<TaskStore, 'fetch' | 'aggregate' | 'get' | 'remove'> & {
removeIfExists: TaskStore['remove'];
@ -243,6 +248,7 @@ export class TaskManagerPlugin
remove: (id: string) => taskStore.remove(id),
removeIfExists: (id: string) => removeIfExists(taskStore, id),
schedule: (...args) => taskScheduling.schedule(...args),
bulkSchedule: (...args) => taskScheduling.bulkSchedule(...args),
ensureScheduled: (...args) => taskScheduling.ensureScheduled(...args),
runSoon: (...args) => taskScheduling.runSoon(...args),
bulkUpdateSchedules: (...args) => taskScheduling.bulkUpdateSchedules(...args),

View file

@ -496,6 +496,47 @@ describe('TaskScheduling', () => {
);
});
});
describe('bulkSchedule', () => {
test('allows scheduling tasks', async () => {
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const task = {
taskType: 'foo',
params: {},
state: {},
};
await taskScheduling.bulkSchedule([task]);
expect(mockTaskStore.bulkSchedule).toHaveBeenCalled();
expect(mockTaskStore.bulkSchedule).toHaveBeenCalledWith([
{
...task,
id: undefined,
schedule: undefined,
traceparent: 'parent',
},
]);
});
test('doesnt allow naively rescheduling existing tasks that have already been scheduled', async () => {
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
mockTaskStore.bulkSchedule.mockRejectedValueOnce({
statusCode: 409,
});
return expect(
taskScheduling.bulkSchedule([
{
id: 'my-foo-id',
taskType: 'foo',
params: {},
state: {},
},
])
).rejects.toMatchObject({
statusCode: 409,
});
});
});
});
function mockTask(overrides: Partial<ConcreteTaskInstance> = {}): ConcreteTaskInstance {

View file

@ -129,6 +129,33 @@ export class TaskScheduling {
});
}
/**
* Bulk schedules a task.
*
* @param tasks - The tasks being scheduled.
* @returns {Promise<ConcreteTaskInstance>}
*/
public async bulkSchedule(
taskInstances: TaskInstanceWithDeprecatedFields[],
options?: Record<string, unknown>
): Promise<ConcreteTaskInstance[]> {
const traceparent =
agent.currentTransaction && agent.currentTransaction.type !== 'request'
? agent.currentTraceparent
: '';
const modifiedTasks = await Promise.all(
taskInstances.map(async (taskInstance) => {
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
...options,
taskInstance: ensureDeprecatedFieldsAreCorrected(taskInstance, this.logger),
});
return { ...modifiedTask, traceparent: traceparent || '' };
})
);
return await this.store.bulkSchedule(modifiedTasks);
}
/**
* Bulk updates schedules for tasks by ids.
* Only tasks with `idle` status will be updated, as for the tasks which have `running` status,

View file

@ -18,6 +18,7 @@ export const taskStoreMock = {
update: jest.fn(),
remove: jest.fn(),
schedule: jest.fn(),
bulkSchedule: jest.fn(),
bulkUpdate: jest.fn(),
get: jest.fn(),
getLifecycle: jest.fn(),

View file

@ -670,6 +670,142 @@ describe('TaskStore', () => {
return expect(store.getLifecycle(randomId())).rejects.toThrow('Bad Request');
});
});
describe('bulkSchedule', () => {
let store: TaskStore;
beforeAll(() => {
store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
});
async function testBulkSchedule(task: unknown) {
savedObjectsClient.bulkCreate.mockImplementation(async () => ({
saved_objects: [
{
id: 'testid',
type: 'test',
attributes: {
attempts: 0,
params: '{"hello":"world"}',
retryAt: null,
runAt: '2019-02-12T21:01:22.479Z',
scheduledAt: '2019-02-12T21:01:22.479Z',
startedAt: null,
state: '{"foo":"bar"}',
status: 'idle',
taskType: 'report',
traceparent: 'apmTraceparent',
},
references: [],
version: '123',
},
],
}));
const result = await store.bulkSchedule(task as TaskInstance[]);
expect(savedObjectsClient.bulkCreate).toHaveBeenCalledTimes(1);
return result;
}
test('serializes the params and state', async () => {
const task = {
id: 'id',
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
traceparent: 'apmTraceparent',
};
const result = await testBulkSchedule([task]);
expect(savedObjectsClient.bulkCreate).toHaveBeenCalledWith(
[
{
id: 'id',
type: 'task',
attributes: {
attempts: 0,
params: '{"hello":"world"}',
retryAt: null,
runAt: '2019-02-12T21:01:22.479Z',
scheduledAt: '2019-02-12T21:01:22.479Z',
startedAt: null,
state: '{"foo":"bar"}',
status: 'idle',
taskType: 'report',
traceparent: 'apmTraceparent',
},
},
],
{ refresh: false }
);
expect(result).toEqual([
{
id: 'testid',
attempts: 0,
schedule: undefined,
params: { hello: 'world' },
retryAt: null,
runAt: mockedDate,
scheduledAt: mockedDate,
scope: undefined,
startedAt: null,
state: { foo: 'bar' },
status: 'idle',
taskType: 'report',
user: undefined,
version: '123',
traceparent: 'apmTraceparent',
},
]);
});
test('returns a concrete task instance', async () => {
const task = {
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
};
const result = await testBulkSchedule([task]);
expect(result).toMatchObject([
{
...task,
id: 'testid',
},
]);
});
test('errors if the task type is unknown', async () => {
await expect(testBulkSchedule([{ taskType: 'nope', params: {}, state: {} }])).rejects.toThrow(
/Unsupported task type "nope"/i
);
});
test('pushes error from saved objects client to errors$', async () => {
const task: TaskInstance = {
id: 'id',
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
};
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
savedObjectsClient.bulkCreate.mockRejectedValue(new Error('Failure'));
await expect(store.bulkSchedule([task])).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failure"`
);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});
});
});
const randomId = () => `id-${_.random(1, 20)}`;

View file

@ -147,6 +147,35 @@ export class TaskStore {
return savedObjectToConcreteTaskInstance(savedObject);
}
/**
* Bulk schedules a task.
*
* @param tasks - The tasks being scheduled.
*/
public async bulkSchedule(taskInstances: TaskInstance[]): Promise<ConcreteTaskInstance[]> {
const objects = taskInstances.map((taskInstance) => {
this.definitions.ensureHas(taskInstance.taskType);
return {
type: 'task',
attributes: taskInstanceToAttributes(taskInstance),
id: taskInstance.id,
};
});
let savedObjects;
try {
savedObjects = await this.savedObjectsRepository.bulkCreate<SerializedConcreteTaskInstance>(
objects,
{ refresh: false }
);
} catch (e) {
this.errors$.next(e);
throw e;
}
return savedObjects.saved_objects.map((so) => savedObjectToConcreteTaskInstance(so));
}
/**
* Fetches a list of scheduled tasks with default sorting.
*