[ResponseOps][Alerting] Create xpack.actions.queued.max circuit breaker (#164632)

Resolves https://github.com/elastic/kibana/issues/162264


## Summary

Adds a limit on the maximum number of actions that can be queued with a
circuit breaker. The limit in serverless is set to 10,000, and 1,000,000
in the other environments.

- If a rule execution exceeds the limit, the circuit breaker kicks in
and stops triggering actions.
- Alerting rule's status updated to warning when circuit breaker is hit

Did not update the `enqueueExecution` bc it's going to be removed in
https://github.com/elastic/kibana/pull/165120.


### Checklist

Delete any items that are not applicable to this PR.

- [x] Any text added follows [EUI's writing
guidelines](https://elastic.github.io/eui/#/guidelines/writing), uses
sentence case text and includes [i18n
support](https://github.com/elastic/kibana/blob/main/packages/kbn-i18n/README.md)
- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios


### To Verify

- Create a 2 rules that have actions
- Set `xpack.actions.queued.max` in kibana.yml to a low number like 2 or
3
- Use the run soon button to queue up actions and hit the circuit
breaker.
- The actions will not be scheduled and the rule status will be set to
warning

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Alexi Doak 2023-09-07 06:59:01 -07:00 committed by GitHub
parent f2929192bd
commit 03f0cdc327
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
47 changed files with 1359 additions and 275 deletions

View file

@ -119,6 +119,7 @@ xpack.alerting.rules.run.ruleTypeOverrides:
xpack.alerting.rules.minimumScheduleInterval.enforce: true
xpack.alerting.rules.maxScheduledPerMinute: 400
xpack.actions.run.maxAttempts: 10
xpack.actions.queued.max: 10000
# Disables ESQL in advanced settings (hides it from the UI)
uiSettings:

View file

@ -227,6 +227,9 @@ xpack.actions.run:
maxAttempts: 5
--
`xpack.actions.queued.max` {ess-icon}::
Specifies the maximum number of actions that can be queued. Default: 1000000
[float]
[[preconfigured-connector-settings]]
=== Preconfigured connector settings

View file

@ -140,6 +140,22 @@ function recordOf<K extends string, V>(
return new RecordOfType(keyType, valueType, options);
}
function oneOf<A, B, C, D, E, F, G, H, I, J, K>(
types: [
Type<A>,
Type<B>,
Type<C>,
Type<D>,
Type<E>,
Type<F>,
Type<G>,
Type<H>,
Type<I>,
Type<J>,
Type<K>
],
options?: TypeOptions<A | B | C | D | E | F | G | H | I | J | K>
): Type<A | B | C | D | E | F | G | H | I | J | K>;
function oneOf<A, B, C, D, E, F, G, H, I, J>(
types: [Type<A>, Type<B>, Type<C>, Type<D>, Type<E>, Type<F>, Type<G>, Type<H>, Type<I>, Type<J>],
options?: TypeOptions<A | B | C | D | E | F | G | H | I | J>

View file

@ -3019,6 +3019,7 @@ describe('bulkEnqueueExecution()', () => {
executionId: '123abc',
apiKey: null,
source: asHttpRequestExecutionSource(request),
actionTypeId: 'my-action-type',
},
{
id: uuidv4(),
@ -3027,6 +3028,7 @@ describe('bulkEnqueueExecution()', () => {
executionId: '456def',
apiKey: null,
source: asHttpRequestExecutionSource(request),
actionTypeId: 'my-action-type',
},
]);
expect(authorization.ensureAuthorized).toHaveBeenCalledWith({
@ -3051,6 +3053,7 @@ describe('bulkEnqueueExecution()', () => {
executionId: '123abc',
apiKey: null,
source: asHttpRequestExecutionSource(request),
actionTypeId: 'my-action-type',
},
{
id: uuidv4(),
@ -3059,6 +3062,7 @@ describe('bulkEnqueueExecution()', () => {
executionId: '456def',
apiKey: null,
source: asHttpRequestExecutionSource(request),
actionTypeId: 'my-action-type',
},
])
).rejects.toMatchInlineSnapshot(`[Error: Unauthorized to execute all actions]`);
@ -3081,6 +3085,7 @@ describe('bulkEnqueueExecution()', () => {
executionId: '123abc',
apiKey: null,
source: asHttpRequestExecutionSource(request),
actionTypeId: 'my-action-type',
},
{
id: uuidv4(),
@ -3089,6 +3094,7 @@ describe('bulkEnqueueExecution()', () => {
executionId: '456def',
apiKey: null,
source: asHttpRequestExecutionSource(request),
actionTypeId: 'my-action-type',
},
]);
@ -3112,6 +3118,7 @@ describe('bulkEnqueueExecution()', () => {
executionId: '123abc',
apiKey: null,
source: asHttpRequestExecutionSource(request),
actionTypeId: 'my-action-type',
},
{
id: uuidv4(),
@ -3120,6 +3127,7 @@ describe('bulkEnqueueExecution()', () => {
executionId: '456def',
apiKey: null,
source: asHttpRequestExecutionSource(request),
actionTypeId: 'my-action-type',
},
];
await expect(actionsClient.bulkEnqueueExecution(opts)).resolves.toMatchInlineSnapshot(

View file

@ -55,6 +55,7 @@ import {
ExecutionEnqueuer,
ExecuteOptions as EnqueueExecutionOptions,
BulkExecutionEnqueuer,
ExecutionResponse,
} from '../create_execute_function';
import { ActionsAuthorization } from '../authorization/actions_authorization';
import {
@ -114,7 +115,7 @@ export interface ConstructorOptions {
inMemoryConnectors: InMemoryConnector[];
actionExecutor: ActionExecutorContract;
ephemeralExecutionEnqueuer: ExecutionEnqueuer<RunNowResult>;
bulkExecutionEnqueuer: BulkExecutionEnqueuer<void>;
bulkExecutionEnqueuer: BulkExecutionEnqueuer<ExecutionResponse>;
request: KibanaRequest;
authorization: ActionsAuthorization;
auditLogger?: AuditLogger;
@ -139,7 +140,7 @@ export interface ActionsClientContext {
request: KibanaRequest;
authorization: ActionsAuthorization;
ephemeralExecutionEnqueuer: ExecutionEnqueuer<RunNowResult>;
bulkExecutionEnqueuer: BulkExecutionEnqueuer<void>;
bulkExecutionEnqueuer: BulkExecutionEnqueuer<ExecutionResponse>;
auditLogger?: AuditLogger;
usageCounter?: UsageCounter;
connectorTokenClient: ConnectorTokenClientContract;
@ -766,7 +767,9 @@ export class ActionsClient {
});
}
public async bulkEnqueueExecution(options: EnqueueExecutionOptions[]): Promise<void> {
public async bulkEnqueueExecution(
options: EnqueueExecutionOptions[]
): Promise<ExecutionResponse> {
const sources: Array<ActionExecutionSource<unknown>> = [];
options.forEach((option) => {
if (option.source) {

View file

@ -28,6 +28,7 @@ const createActionsConfigMock = () => {
validateEmailAddresses: jest.fn().mockReturnValue(undefined),
getMaxAttempts: jest.fn().mockReturnValue(3),
enableFooterInEmail: jest.fn().mockReturnValue(true),
getMaxQueued: jest.fn().mockReturnValue(1000),
};
return mocked;
};

View file

@ -563,3 +563,20 @@ describe('getMaxAttempts()', () => {
expect(maxAttempts).toEqual(3);
});
});
describe('getMaxQueued()', () => {
test('returns the queued actions max defined in config', () => {
const acu = getActionsConfigurationUtilities({
...defaultActionsConfig,
queued: { max: 1 },
});
const max = acu.getMaxQueued();
expect(max).toEqual(1);
});
test('returns the default queued actions max', () => {
const acu = getActionsConfigurationUtilities(defaultActionsConfig);
const max = acu.getMaxQueued();
expect(max).toEqual(1000000);
});
});

View file

@ -11,7 +11,13 @@ import url from 'url';
import { curry } from 'lodash';
import { pipe } from 'fp-ts/lib/pipeable';
import { ActionsConfig, AllowedHosts, EnabledActionTypes, CustomHostSettings } from './config';
import {
ActionsConfig,
AllowedHosts,
EnabledActionTypes,
CustomHostSettings,
DEFAULT_QUEUED_MAX,
} from './config';
import { getCanonicalCustomHostUrl } from './lib/custom_host_settings';
import { ActionTypeDisabledError } from './lib';
import { ProxySettings, ResponseSettings, SSLSettings } from './types';
@ -54,6 +60,7 @@ export interface ActionsConfigurationUtilities {
options?: ValidateEmailAddressesOptions
): string | undefined;
enableFooterInEmail: () => boolean;
getMaxQueued: () => number;
}
function allowListErrorMessage(field: AllowListingField, value: string) {
@ -217,5 +224,6 @@ export function getActionsConfigurationUtilities(
);
},
enableFooterInEmail: () => config.enableFooterInEmail,
getMaxQueued: () => config.queued?.max || DEFAULT_QUEUED_MAX,
};
}

View file

@ -19,6 +19,9 @@ export enum EnabledActionTypes {
const MAX_MAX_ATTEMPTS = 10;
const MIN_MAX_ATTEMPTS = 1;
const MIN_QUEUED_MAX = 1;
export const DEFAULT_QUEUED_MAX = 1000000;
const preconfiguredActionSchema = schema.object({
name: schema.string({ minLength: 1 }),
actionTypeId: schema.string({ minLength: 1 }),
@ -130,6 +133,11 @@ export const configSchema = schema.object({
})
),
enableFooterInEmail: schema.boolean({ defaultValue: true }),
queued: schema.maybe(
schema.object({
max: schema.maybe(schema.number({ min: MIN_QUEUED_MAX, defaultValue: DEFAULT_QUEUED_MAX })),
})
),
});
export type ActionsConfig = TypeOf<typeof configSchema>;

View file

@ -15,12 +15,24 @@ import {
asHttpRequestExecutionSource,
asSavedObjectExecutionSource,
} from './lib/action_execution_source';
import { actionsConfigMock } from './actions_config.mock';
const mockTaskManager = taskManagerMock.createStart();
const savedObjectsClient = savedObjectsClientMock.create();
const request = {} as KibanaRequest;
const mockActionsConfig = actionsConfigMock.create();
beforeEach(() => jest.resetAllMocks());
beforeEach(() => {
jest.resetAllMocks();
mockTaskManager.aggregate.mockResolvedValue({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: 0, relation: 'eq' }, max_score: null, hits: [] },
aggregations: {},
});
mockActionsConfig.getMaxQueued.mockReturnValue(10);
});
describe('bulkExecute()', () => {
test('schedules the action with all given parameters', async () => {
@ -30,6 +42,7 @@ describe('bulkExecute()', () => {
actionTypeRegistry,
isESOCanEncrypt: true,
inMemoryConnectors: [],
configurationUtilities: mockActionsConfig,
});
savedObjectsClient.bulkGet.mockResolvedValueOnce({
saved_objects: [
@ -63,6 +76,7 @@ describe('bulkExecute()', () => {
executionId: '123abc',
apiKey: Buffer.from('123:abc').toString('base64'),
source: asHttpRequestExecutionSource(request),
actionTypeId: 'mock-action',
},
]);
expect(mockTaskManager.bulkSchedule).toHaveBeenCalledTimes(1);
@ -118,6 +132,7 @@ describe('bulkExecute()', () => {
actionTypeRegistry,
isESOCanEncrypt: true,
inMemoryConnectors: [],
configurationUtilities: mockActionsConfig,
});
savedObjectsClient.bulkGet.mockResolvedValueOnce({
saved_objects: [
@ -153,6 +168,7 @@ describe('bulkExecute()', () => {
consumer: 'test-consumer',
apiKey: Buffer.from('123:abc').toString('base64'),
source: asHttpRequestExecutionSource(request),
actionTypeId: 'mock-action',
},
]);
expect(mockTaskManager.bulkSchedule).toHaveBeenCalledTimes(1);
@ -209,6 +225,7 @@ describe('bulkExecute()', () => {
actionTypeRegistry,
isESOCanEncrypt: true,
inMemoryConnectors: [],
configurationUtilities: mockActionsConfig,
});
savedObjectsClient.bulkGet.mockResolvedValueOnce({
saved_objects: [
@ -248,6 +265,7 @@ describe('bulkExecute()', () => {
typeId: 'some-typeId',
},
],
actionTypeId: 'mock-action',
},
]);
expect(savedObjectsClient.bulkCreate).toHaveBeenCalledWith(
@ -304,6 +322,7 @@ describe('bulkExecute()', () => {
secrets: {},
},
],
configurationUtilities: mockActionsConfig,
});
const source = { type: 'alert', id: uuidv4() };
@ -339,6 +358,7 @@ describe('bulkExecute()', () => {
executionId: '123abc',
apiKey: Buffer.from('123:abc').toString('base64'),
source: asSavedObjectExecutionSource(source),
actionTypeId: 'mock-action',
},
]);
expect(mockTaskManager.bulkSchedule).toHaveBeenCalledTimes(1);
@ -401,6 +421,7 @@ describe('bulkExecute()', () => {
isSystemAction: true,
},
],
configurationUtilities: mockActionsConfig,
});
const source = { type: 'alert', id: uuidv4() };
@ -436,6 +457,7 @@ describe('bulkExecute()', () => {
executionId: 'system-connector-.casesabc',
apiKey: Buffer.from('system-connector-test.system-action:abc').toString('base64'),
source: asSavedObjectExecutionSource(source),
actionTypeId: 'mock-action',
},
]);
expect(mockTaskManager.bulkSchedule).toHaveBeenCalledTimes(1);
@ -498,6 +520,7 @@ describe('bulkExecute()', () => {
secrets: {},
},
],
configurationUtilities: mockActionsConfig,
});
const source = { type: 'alert', id: uuidv4() };
@ -541,6 +564,7 @@ describe('bulkExecute()', () => {
typeId: 'some-typeId',
},
],
actionTypeId: 'mock-action',
},
]);
expect(mockTaskManager.bulkSchedule).toHaveBeenCalledTimes(1);
@ -616,6 +640,7 @@ describe('bulkExecute()', () => {
isSystemAction: true,
},
],
configurationUtilities: mockActionsConfig,
});
const source = { type: 'alert', id: uuidv4() };
@ -659,6 +684,7 @@ describe('bulkExecute()', () => {
typeId: 'some-typeId',
},
],
actionTypeId: 'mock-action',
},
]);
expect(mockTaskManager.bulkSchedule).toHaveBeenCalledTimes(1);
@ -723,6 +749,7 @@ describe('bulkExecute()', () => {
isESOCanEncrypt: false,
actionTypeRegistry: actionTypeRegistryMock.create(),
inMemoryConnectors: [],
configurationUtilities: mockActionsConfig,
});
await expect(
executeFn(savedObjectsClient, [
@ -733,6 +760,7 @@ describe('bulkExecute()', () => {
executionId: '123abc',
apiKey: null,
source: asHttpRequestExecutionSource(request),
actionTypeId: 'mock-action',
},
])
).rejects.toThrowErrorMatchingInlineSnapshot(
@ -746,6 +774,7 @@ describe('bulkExecute()', () => {
isESOCanEncrypt: true,
actionTypeRegistry: actionTypeRegistryMock.create(),
inMemoryConnectors: [],
configurationUtilities: mockActionsConfig,
});
savedObjectsClient.bulkGet.mockResolvedValueOnce({
saved_objects: [
@ -770,6 +799,7 @@ describe('bulkExecute()', () => {
executionId: '123abc',
apiKey: null,
source: asHttpRequestExecutionSource(request),
actionTypeId: 'mock-action',
},
])
).rejects.toThrowErrorMatchingInlineSnapshot(
@ -784,6 +814,7 @@ describe('bulkExecute()', () => {
isESOCanEncrypt: true,
actionTypeRegistry: mockedActionTypeRegistry,
inMemoryConnectors: [],
configurationUtilities: mockActionsConfig,
});
mockedActionTypeRegistry.ensureActionTypeEnabled.mockImplementation(() => {
throw new Error('Fail');
@ -810,6 +841,7 @@ describe('bulkExecute()', () => {
executionId: '123abc',
apiKey: null,
source: asHttpRequestExecutionSource(request),
actionTypeId: 'mock-action',
},
])
).rejects.toThrowErrorMatchingInlineSnapshot(`"Fail"`);
@ -833,6 +865,7 @@ describe('bulkExecute()', () => {
isSystemAction: false,
},
],
configurationUtilities: mockActionsConfig,
});
mockedActionTypeRegistry.isActionExecutable.mockImplementation(() => true);
savedObjectsClient.bulkGet.mockResolvedValueOnce({
@ -868,6 +901,7 @@ describe('bulkExecute()', () => {
executionId: '123abc',
apiKey: null,
source: asHttpRequestExecutionSource(request),
actionTypeId: 'mock-action',
},
]);
@ -892,6 +926,7 @@ describe('bulkExecute()', () => {
isSystemAction: true,
},
],
configurationUtilities: mockActionsConfig,
});
mockedActionTypeRegistry.isActionExecutable.mockImplementation(() => true);
savedObjectsClient.bulkGet.mockResolvedValueOnce({
@ -927,9 +962,64 @@ describe('bulkExecute()', () => {
executionId: '123abc',
apiKey: null,
source: asHttpRequestExecutionSource(request),
actionTypeId: 'mock-action',
},
]);
expect(mockedActionTypeRegistry.ensureActionTypeEnabled).not.toHaveBeenCalled();
});
test('returns queuedActionsLimitError response when the max number of queued actions has been reached', async () => {
mockTaskManager.aggregate.mockResolvedValue({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: 2, relation: 'eq' }, max_score: null, hits: [] },
aggregations: {},
});
mockActionsConfig.getMaxQueued.mockReturnValueOnce(2);
const executeFn = createBulkExecutionEnqueuerFunction({
taskManager: mockTaskManager,
actionTypeRegistry: actionTypeRegistryMock.create(),
isESOCanEncrypt: true,
inMemoryConnectors: [],
configurationUtilities: mockActionsConfig,
});
savedObjectsClient.bulkGet.mockResolvedValueOnce({
saved_objects: [],
});
savedObjectsClient.bulkCreate.mockResolvedValueOnce({
saved_objects: [],
});
expect(
await executeFn(savedObjectsClient, [
{
id: '123',
params: { baz: false },
spaceId: 'default',
executionId: '123abc',
apiKey: null,
source: asHttpRequestExecutionSource(request),
actionTypeId: 'mock-action',
},
])
).toMatchInlineSnapshot(`
Object {
"errors": true,
"items": Array [
Object {
"actionTypeId": "mock-action",
"id": "123",
"response": "queuedActionsLimitError",
},
],
}
`);
expect(mockTaskManager.bulkSchedule).toHaveBeenCalledTimes(1);
expect(mockTaskManager.bulkSchedule.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Array [],
]
`);
});
});

View file

@ -16,12 +16,15 @@ import {
import { ACTION_TASK_PARAMS_SAVED_OBJECT_TYPE } from './constants/saved_objects';
import { ExecuteOptions as ActionExecutorOptions } from './lib/action_executor';
import { extractSavedObjectReferences, isSavedObjectExecutionSource } from './lib';
import { ActionsConfigurationUtilities } from './actions_config';
import { hasReachedTheQueuedActionsLimit } from './lib/has_reached_queued_actions_limit';
interface CreateExecuteFunctionOptions {
taskManager: TaskManagerStartContract;
isESOCanEncrypt: boolean;
actionTypeRegistry: ActionTypeRegistryContract;
inMemoryConnectors: InMemoryConnector[];
configurationUtilities: ActionsConfigurationUtilities;
}
export interface ExecuteOptions
@ -30,6 +33,7 @@ export interface ExecuteOptions
spaceId: string;
apiKey: string | null;
executionId: string;
actionTypeId: string;
}
interface ActionTaskParams
@ -54,12 +58,29 @@ export type BulkExecutionEnqueuer<T> = (
actionsToExectute: ExecuteOptions[]
) => Promise<T>;
export enum ExecutionResponseType {
SUCCESS = 'success',
QUEUED_ACTIONS_LIMIT_ERROR = 'queuedActionsLimitError',
}
export interface ExecutionResponse {
errors: boolean;
items: ExecutionResponseItem[];
}
export interface ExecutionResponseItem {
id: string;
actionTypeId: string;
response: ExecutionResponseType;
}
export function createBulkExecutionEnqueuerFunction({
taskManager,
actionTypeRegistry,
isESOCanEncrypt,
inMemoryConnectors,
}: CreateExecuteFunctionOptions): BulkExecutionEnqueuer<void> {
configurationUtilities,
}: CreateExecuteFunctionOptions): BulkExecutionEnqueuer<ExecutionResponse> {
return async function execute(
unsecuredSavedObjectsClient: SavedObjectsClientContract,
actionsToExecute: ExecuteOptions[]
@ -70,6 +91,19 @@ export function createBulkExecutionEnqueuerFunction({
);
}
const { hasReachedLimit, numberOverLimit } = await hasReachedTheQueuedActionsLimit(
taskManager,
configurationUtilities,
actionsToExecute.length
);
let actionsOverLimit: ExecuteOptions[] = [];
if (hasReachedLimit) {
actionsOverLimit = actionsToExecute.splice(
actionsToExecute.length - numberOverLimit,
numberOverLimit
);
}
const actionTypeIds: Record<string, string> = {};
const spaceIds: Record<string, string> = {};
const connectorIsInMemory: Record<string, boolean> = {};
@ -144,6 +178,22 @@ export function createBulkExecutionEnqueuerFunction({
};
});
await taskManager.bulkSchedule(taskInstances);
return {
errors: actionsOverLimit.length > 0,
items: actionsToExecute
.map((a) => ({
id: a.id,
actionTypeId: a.actionTypeId,
response: ExecutionResponseType.SUCCESS,
}))
.concat(
actionsOverLimit.map((a) => ({
id: a.id,
actionTypeId: a.actionTypeId,
response: ExecutionResponseType.QUEUED_ACTIONS_LIMIT_ERROR,
}))
),
};
};
}

View file

@ -14,11 +14,23 @@ import {
asNotificationExecutionSource,
asSavedObjectExecutionSource,
} from './lib/action_execution_source';
import { actionsConfigMock } from './actions_config.mock';
const mockTaskManager = taskManagerMock.createStart();
const internalSavedObjectsRepository = savedObjectsRepositoryMock.create();
const mockActionsConfig = actionsConfigMock.create();
beforeEach(() => jest.resetAllMocks());
beforeEach(() => {
jest.resetAllMocks();
mockTaskManager.aggregate.mockResolvedValue({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: 0, relation: 'eq' }, max_score: null, hits: [] },
aggregations: {},
});
mockActionsConfig.getMaxQueued.mockReturnValue(10);
});
describe('bulkExecute()', () => {
test.each([
@ -42,6 +54,7 @@ describe('bulkExecute()', () => {
secrets: {},
},
],
configurationUtilities: mockActionsConfig,
});
internalSavedObjectsRepository.bulkCreate.mockResolvedValueOnce({
@ -154,6 +167,7 @@ describe('bulkExecute()', () => {
secrets: {},
},
],
configurationUtilities: mockActionsConfig,
});
internalSavedObjectsRepository.bulkCreate.mockResolvedValueOnce({
@ -278,6 +292,7 @@ describe('bulkExecute()', () => {
secrets: {},
},
],
configurationUtilities: mockActionsConfig,
});
internalSavedObjectsRepository.bulkCreate.mockResolvedValueOnce({
@ -426,6 +441,7 @@ describe('bulkExecute()', () => {
secrets: {},
},
],
configurationUtilities: mockActionsConfig,
});
await expect(
executeFn(internalSavedObjectsRepository, [
@ -468,6 +484,7 @@ describe('bulkExecute()', () => {
secrets: {},
},
],
configurationUtilities: mockActionsConfig,
});
mockedConnectorTypeRegistry.ensureActionTypeEnabled.mockImplementation(() => {
throw new Error('Fail');
@ -521,6 +538,7 @@ describe('bulkExecute()', () => {
secrets: {},
},
],
configurationUtilities: mockActionsConfig,
});
await expect(
executeFn(internalSavedObjectsRepository, [
@ -540,4 +558,57 @@ describe('bulkExecute()', () => {
);
}
);
test.each([
[true, false],
[false, true],
])(
'returns queuedActionsLimitError response when the max number of queued actions has been reached: %s, isSystemAction: %s',
async (isPreconfigured, isSystemAction) => {
mockTaskManager.aggregate.mockResolvedValue({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: 2, relation: 'eq' }, max_score: null, hits: [] },
aggregations: {},
});
mockActionsConfig.getMaxQueued.mockReturnValueOnce(2);
const executeFn = createBulkUnsecuredExecutionEnqueuerFunction({
taskManager: mockTaskManager,
connectorTypeRegistry: actionTypeRegistryMock.create(),
inMemoryConnectors: [
{
id: '123',
actionTypeId: '.email',
config: {},
isPreconfigured,
isDeprecated: false,
isSystemAction,
name: 'x',
secrets: {},
},
],
configurationUtilities: mockActionsConfig,
});
internalSavedObjectsRepository.bulkCreate.mockResolvedValueOnce({
saved_objects: [],
});
expect(
await executeFn(internalSavedObjectsRepository, [
{
id: '123',
params: { baz: false },
source: asNotificationExecutionSource({ connectorId: 'abc', requesterId: 'foo' }),
},
])
).toEqual({ errors: true, items: [{ id: '123', response: 'queuedActionsLimitError' }] });
expect(mockTaskManager.bulkSchedule).toHaveBeenCalledTimes(1);
expect(mockTaskManager.bulkSchedule.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Array [],
]
`);
}
);
});

View file

@ -14,6 +14,9 @@ import {
import { ACTION_TASK_PARAMS_SAVED_OBJECT_TYPE } from './constants/saved_objects';
import { ExecuteOptions as ActionExecutorOptions } from './lib/action_executor';
import { extractSavedObjectReferences, isSavedObjectExecutionSource } from './lib';
import { ExecutionResponseItem, ExecutionResponseType } from './create_execute_function';
import { ActionsConfigurationUtilities } from './actions_config';
import { hasReachedTheQueuedActionsLimit } from './lib/has_reached_queued_actions_limit';
// This allowlist should only contain connector types that don't require API keys for
// execution.
@ -22,6 +25,7 @@ interface CreateBulkUnsecuredExecuteFunctionOptions {
taskManager: TaskManagerStartContract;
connectorTypeRegistry: ConnectorTypeRegistryContract;
inMemoryConnectors: InMemoryConnector[];
configurationUtilities: ActionsConfigurationUtilities;
}
export interface ExecuteOptions
@ -29,6 +33,11 @@ export interface ExecuteOptions
id: string;
}
export interface ExecutionResponse {
errors: boolean;
items: ExecutionResponseItem[];
}
interface ActionTaskParams
extends Pick<ActionExecutorOptions, 'actionId' | 'params' | 'relatedSavedObjects'> {
apiKey: string | null;
@ -43,11 +52,25 @@ export function createBulkUnsecuredExecutionEnqueuerFunction({
taskManager,
connectorTypeRegistry,
inMemoryConnectors,
}: CreateBulkUnsecuredExecuteFunctionOptions): BulkUnsecuredExecutionEnqueuer<void> {
configurationUtilities,
}: CreateBulkUnsecuredExecuteFunctionOptions): BulkUnsecuredExecutionEnqueuer<ExecutionResponse> {
return async function execute(
internalSavedObjectsRepository: ISavedObjectsRepository,
actionsToExecute: ExecuteOptions[]
) {
const { hasReachedLimit, numberOverLimit } = await hasReachedTheQueuedActionsLimit(
taskManager,
configurationUtilities,
actionsToExecute.length
);
let actionsOverLimit: ExecuteOptions[] = [];
if (hasReachedLimit) {
actionsOverLimit = actionsToExecute.splice(
actionsToExecute.length - numberOverLimit,
numberOverLimit
);
}
const connectorTypeIds: Record<string, string> = {};
const connectorIds = [...new Set(actionsToExecute.map((action) => action.id))];
@ -131,6 +154,23 @@ export function createBulkUnsecuredExecutionEnqueuerFunction({
};
});
await taskManager.bulkSchedule(taskInstances);
return {
errors: actionsOverLimit.length > 0,
items: actionsToExecute
.map((a) => ({
id: a.id,
response: ExecutionResponseType.SUCCESS,
actionTypeId: connectorTypeIds[a.id],
}))
.concat(
actionsOverLimit.map((a) => ({
id: a.id,
response: ExecutionResponseType.QUEUED_ACTIONS_LIMIT_ERROR,
actionTypeId: connectorTypeIds[a.id],
}))
),
};
};
}

View file

@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks';
import { actionsConfigMock } from '../actions_config.mock';
import { hasReachedTheQueuedActionsLimit } from './has_reached_queued_actions_limit';
const mockTaskManager = taskManagerMock.createStart();
const mockActionsConfig = actionsConfigMock.create();
beforeEach(() => {
jest.resetAllMocks();
mockTaskManager.aggregate.mockResolvedValue({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: 0, relation: 'eq' }, max_score: null, hits: [] },
aggregations: {},
});
mockActionsConfig.getMaxQueued.mockReturnValue(10);
});
describe('hasReachedTheQueuedActionsLimit()', () => {
test('returns true if the number of queued actions is greater than the config limit', async () => {
mockTaskManager.aggregate.mockResolvedValue({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: 3, relation: 'eq' }, max_score: null, hits: [] },
aggregations: {},
});
mockActionsConfig.getMaxQueued.mockReturnValueOnce(2);
expect(await hasReachedTheQueuedActionsLimit(mockTaskManager, mockActionsConfig, 1)).toEqual({
hasReachedLimit: true,
numberOverLimit: 2,
});
});
test('returns true if the number of queued actions is equal the config limit', async () => {
mockTaskManager.aggregate.mockResolvedValue({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: 2, relation: 'eq' }, max_score: null, hits: [] },
aggregations: {},
});
mockActionsConfig.getMaxQueued.mockReturnValueOnce(3);
expect(await hasReachedTheQueuedActionsLimit(mockTaskManager, mockActionsConfig, 1)).toEqual({
hasReachedLimit: true,
numberOverLimit: 0,
});
});
test('returns false if the number of queued actions is less than the config limit', async () => {
mockTaskManager.aggregate.mockResolvedValue({
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: 1, relation: 'eq' }, max_score: null, hits: [] },
aggregations: {},
});
mockActionsConfig.getMaxQueued.mockReturnValueOnce(3);
expect(await hasReachedTheQueuedActionsLimit(mockTaskManager, mockActionsConfig, 1)).toEqual({
hasReachedLimit: false,
numberOverLimit: 0,
});
});
});

View file

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { TaskManagerStartContract } from '@kbn/task-manager-plugin/server';
import { ActionsConfigurationUtilities } from '../actions_config';
export async function hasReachedTheQueuedActionsLimit(
taskManager: TaskManagerStartContract,
configurationUtilities: ActionsConfigurationUtilities,
numberOfActions: number
) {
const limit = configurationUtilities.getMaxQueued();
const {
hits: { total },
} = await taskManager.aggregate({
query: {
bool: {
filter: {
bool: {
must: [
{
term: {
'task.scope': 'actions',
},
},
],
},
},
},
},
aggs: {},
});
const tasks = typeof total === 'number' ? total : total?.value ?? 0;
const numberOfTasks = tasks + numberOfActions;
const hasReachedLimit = numberOfTasks >= limit;
return {
hasReachedLimit,
numberOverLimit: hasReachedLimit ? numberOfTasks - limit : 0,
};
}

View file

@ -301,7 +301,7 @@ export class ActionsPlugin implements Plugin<PluginSetupContract, PluginStartCon
core.http.registerRouteHandlerContext<ActionsRequestHandlerContext, 'actions'>(
'actions',
this.createRouteHandlerContext(core)
this.createRouteHandlerContext(core, actionsConfigUtils)
);
if (usageCollection) {
const eventLogIndex = this.eventLogService.getIndexPattern();
@ -404,8 +404,11 @@ export class ActionsPlugin implements Plugin<PluginSetupContract, PluginStartCon
isESOCanEncrypt,
instantiateAuthorization,
getUnsecuredSavedObjectsClient,
actionsConfig,
} = this;
const actionsConfigUtils = getActionsConfigurationUtilities(actionsConfig);
licenseState?.setNotifyUsage(plugins.licensing.featureUsage.notifyUsage);
const encryptedSavedObjectsClient = plugins.encryptedSavedObjects.getClient({
@ -457,12 +460,14 @@ export class ActionsPlugin implements Plugin<PluginSetupContract, PluginStartCon
actionTypeRegistry: actionTypeRegistry!,
isESOCanEncrypt: isESOCanEncrypt!,
inMemoryConnectors: this.inMemoryConnectors,
configurationUtilities: actionsConfigUtils,
}),
bulkExecutionEnqueuer: createBulkExecutionEnqueuerFunction({
taskManager: plugins.taskManager,
actionTypeRegistry: actionTypeRegistry!,
isESOCanEncrypt: isESOCanEncrypt!,
inMemoryConnectors: this.inMemoryConnectors,
configurationUtilities: actionsConfigUtils,
}),
auditLogger: this.security?.audit.asScoped(request),
usageCounter: this.usageCounter,
@ -488,6 +493,7 @@ export class ActionsPlugin implements Plugin<PluginSetupContract, PluginStartCon
taskManager: plugins.taskManager,
connectorTypeRegistry: actionTypeRegistry!,
inMemoryConnectors: this.inMemoryConnectors,
configurationUtilities: actionsConfigUtils,
}),
});
};
@ -641,7 +647,8 @@ export class ActionsPlugin implements Plugin<PluginSetupContract, PluginStartCon
};
private createRouteHandlerContext = (
core: CoreSetup<ActionsPluginsStart>
core: CoreSetup<ActionsPluginsStart>,
actionsConfigUtils: ActionsConfigurationUtilities
): IContextProvider<ActionsRequestHandlerContext, 'actions'> => {
const {
actionTypeRegistry,
@ -687,12 +694,14 @@ export class ActionsPlugin implements Plugin<PluginSetupContract, PluginStartCon
actionTypeRegistry: actionTypeRegistry!,
isESOCanEncrypt: isESOCanEncrypt!,
inMemoryConnectors,
configurationUtilities: actionsConfigUtils,
}),
bulkExecutionEnqueuer: createBulkExecutionEnqueuerFunction({
taskManager,
actionTypeRegistry: actionTypeRegistry!,
isESOCanEncrypt: isESOCanEncrypt!,
inMemoryConnectors,
configurationUtilities: actionsConfigUtils,
}),
auditLogger: security?.audit.asScoped(request),
usageCounter,

View file

@ -9,6 +9,7 @@ import { ISavedObjectsRepository } from '@kbn/core/server';
import {
BulkUnsecuredExecutionEnqueuer,
ExecuteOptions,
ExecutionResponse,
} from '../create_unsecured_execute_function';
import { asNotificationExecutionSource } from '../lib';
@ -24,16 +25,19 @@ const ALLOWED_REQUESTER_IDS = [
export interface UnsecuredActionsClientOpts {
internalSavedObjectsRepository: ISavedObjectsRepository;
executionEnqueuer: BulkUnsecuredExecutionEnqueuer<void>;
executionEnqueuer: BulkUnsecuredExecutionEnqueuer<ExecutionResponse>;
}
export interface IUnsecuredActionsClient {
bulkEnqueueExecution: (requesterId: string, actionsToExecute: ExecuteOptions[]) => Promise<void>;
bulkEnqueueExecution: (
requesterId: string,
actionsToExecute: ExecuteOptions[]
) => Promise<ExecutionResponse>;
}
export class UnsecuredActionsClient {
private readonly internalSavedObjectsRepository: ISavedObjectsRepository;
private readonly executionEnqueuer: BulkUnsecuredExecutionEnqueuer<void>;
private readonly executionEnqueuer: BulkUnsecuredExecutionEnqueuer<ExecutionResponse>;
constructor(params: UnsecuredActionsClientOpts) {
this.executionEnqueuer = params.executionEnqueuer;
@ -43,7 +47,7 @@ export class UnsecuredActionsClient {
public async bulkEnqueueExecution(
requesterId: string,
actionsToExecute: ExecuteOptions[]
): Promise<void> {
): Promise<ExecutionResponse> {
// Check that requesterId is allowed
if (!ALLOWED_REQUESTER_IDS.includes(requesterId)) {
throw new Error(

View file

@ -40,6 +40,7 @@ export const ruleExecutionStatusErrorReason = {
export const ruleExecutionStatusWarningReason = {
MAX_EXECUTABLE_ACTIONS: 'maxExecutableActions',
MAX_ALERTS: 'maxAlerts',
MAX_QUEUED_ACTIONS: 'maxQueuedActions',
} as const;
export type RuleNotifyWhen = typeof ruleNotifyWhen[keyof typeof ruleNotifyWhen];

View file

@ -110,6 +110,7 @@ export const ruleExecutionStatusSchema = schema.object({
reason: schema.oneOf([
schema.literal(ruleExecutionStatusWarningReasonV1.MAX_EXECUTABLE_ACTIONS),
schema.literal(ruleExecutionStatusWarningReasonV1.MAX_ALERTS),
schema.literal(ruleExecutionStatusWarningReasonV1.MAX_QUEUED_ACTIONS),
]),
message: schema.string(),
})
@ -136,6 +137,7 @@ export const ruleLastRunSchema = schema.object({
schema.literal(ruleExecutionStatusErrorReasonV1.VALIDATE),
schema.literal(ruleExecutionStatusWarningReasonV1.MAX_EXECUTABLE_ACTIONS),
schema.literal(ruleExecutionStatusWarningReasonV1.MAX_ALERTS),
schema.literal(ruleExecutionStatusWarningReasonV1.MAX_QUEUED_ACTIONS),
])
)
),

View file

@ -60,6 +60,7 @@ export enum RuleExecutionStatusErrorReasons {
export enum RuleExecutionStatusWarningReasons {
MAX_EXECUTABLE_ACTIONS = 'maxExecutableActions',
MAX_ALERTS = 'maxAlerts',
MAX_QUEUED_ACTIONS = 'maxQueuedActions',
}
export type RuleAlertingOutcome = 'failure' | 'success' | 'unknown' | 'warning';

View file

@ -40,4 +40,5 @@ export const ruleExecutionStatusErrorReason = {
export const ruleExecutionStatusWarningReason = {
MAX_EXECUTABLE_ACTIONS: 'maxExecutableActions',
MAX_ALERTS: 'maxAlerts',
MAX_QUEUED_ACTIONS: 'maxQueuedActions',
} as const;

View file

@ -55,6 +55,7 @@ export const ruleExecutionStatusSchema = schema.object({
reason: schema.oneOf([
schema.literal(ruleExecutionStatusWarningReason.MAX_EXECUTABLE_ACTIONS),
schema.literal(ruleExecutionStatusWarningReason.MAX_ALERTS),
schema.literal(ruleExecutionStatusWarningReason.MAX_QUEUED_ACTIONS),
]),
message: schema.string(),
})
@ -81,6 +82,7 @@ export const ruleLastRunSchema = schema.object({
schema.literal(ruleExecutionStatusErrorReason.VALIDATE),
schema.literal(ruleExecutionStatusWarningReason.MAX_EXECUTABLE_ACTIONS),
schema.literal(ruleExecutionStatusWarningReason.MAX_ALERTS),
schema.literal(ruleExecutionStatusWarningReason.MAX_QUEUED_ACTIONS),
])
)
),

View file

@ -21,6 +21,10 @@ export const translations = {
defaultMessage:
'Rule reported more than the maximum number of alerts in a single run. Alerts may be missed and recovery notifications may be delayed',
}),
maxQueuedActions: i18n.translate('xpack.alerting.taskRunner.warning.maxQueuedActions', {
defaultMessage:
'The maximum number of queued actions was reached; excess actions were not triggered.',
}),
},
},
};

View file

@ -40,4 +40,5 @@ export const ruleExecutionStatusErrorReasonAttributes = {
export const ruleExecutionStatusWarningReasonAttributes = {
MAX_EXECUTABLE_ACTIONS: 'maxExecutableActions',
MAX_ALERTS: 'maxAlerts',
MAX_QUEUED_ACTIONS: 'maxQueuedActions',
} as const;

View file

@ -728,6 +728,7 @@ describe('AlertingEventLogger', () => {
totalSearchDurationMs: 10333,
hasReachedAlertLimit: false,
triggeredActionsStatus: ActionsCompletion.COMPLETE,
hasReachedQueuedActionsLimit: false,
},
});
@ -826,6 +827,7 @@ describe('AlertingEventLogger', () => {
totalSearchDurationMs: 10333,
hasReachedAlertLimit: false,
triggeredActionsStatus: ActionsCompletion.COMPLETE,
hasReachedQueuedActionsLimit: false,
},
timings: {
[TaskRunnerTimerSpan.StartTaskRun]: 10,

View file

@ -13,6 +13,7 @@ import { RuleResultServiceResults, RuleResultService } from '../monitoring/rule_
const getMetrics = ({
hasReachedAlertLimit = false,
triggeredActionsStatus = ActionsCompletion.COMPLETE,
hasReachedQueuedActionsLimit = false,
}): RuleRunMetrics => {
return {
triggeredActionsStatus,
@ -25,6 +26,7 @@ const getMetrics = ({
numberOfTriggeredActions: 5,
totalSearchDurationMs: 2,
hasReachedAlertLimit,
hasReachedQueuedActionsLimit,
};
};
@ -126,6 +128,31 @@ describe('lastRunFromState', () => {
});
});
it('returns warning if rules actions completition is partial and queued action circuit breaker opens', () => {
const result = lastRunFromState(
{
metrics: getMetrics({
triggeredActionsStatus: ActionsCompletion.PARTIAL,
hasReachedQueuedActionsLimit: true,
}),
},
getRuleResultService({})
);
expect(result.lastRun.outcome).toEqual('warning');
expect(result.lastRun.outcomeMsg).toEqual([
'The maximum number of queued actions was reached; excess actions were not triggered.',
]);
expect(result.lastRun.warning).toEqual('maxQueuedActions');
expect(result.lastRun.alertsCount).toEqual({
active: 10,
new: 12,
recovered: 11,
ignored: 0,
});
});
it('overwrites rule execution warning if rule has reached alert limit; outcome messages are merged', () => {
const ruleExecutionOutcomeMessage = 'Rule execution reported a warning';
const frameworkOutcomeMessage =
@ -184,6 +211,38 @@ describe('lastRunFromState', () => {
});
});
it('overwrites rule execution warning if rule has reached queued action limit; outcome messages are merged', () => {
const ruleExecutionOutcomeMessage = 'Rule execution reported a warning';
const frameworkOutcomeMessage =
'The maximum number of queued actions was reached; excess actions were not triggered.';
const result = lastRunFromState(
{
metrics: getMetrics({
triggeredActionsStatus: ActionsCompletion.PARTIAL,
hasReachedQueuedActionsLimit: true,
}),
},
getRuleResultService({
warnings: ['MOCK_WARNING'],
outcomeMessage: 'Rule execution reported a warning',
})
);
expect(result.lastRun.outcome).toEqual('warning');
expect(result.lastRun.outcomeMsg).toEqual([
frameworkOutcomeMessage,
ruleExecutionOutcomeMessage,
]);
expect(result.lastRun.warning).toEqual('maxQueuedActions');
expect(result.lastRun.alertsCount).toEqual({
active: 10,
new: 12,
recovered: 11,
ignored: 0,
});
});
it('overwrites warning outcome to error if rule execution reports an error', () => {
const result = lastRunFromState(
{

View file

@ -48,8 +48,13 @@ export const lastRunFromState = (
outcomeMsg.push(translations.taskRunner.warning.maxAlerts);
} else if (metrics.triggeredActionsStatus === ActionsCompletion.PARTIAL) {
outcome = RuleLastRunOutcomeValues[1];
warning = RuleExecutionStatusWarningReasons.MAX_EXECUTABLE_ACTIONS;
outcomeMsg.push(translations.taskRunner.warning.maxExecutableActions);
if (metrics.hasReachedQueuedActionsLimit) {
warning = RuleExecutionStatusWarningReasons.MAX_QUEUED_ACTIONS;
outcomeMsg.push(translations.taskRunner.warning.maxQueuedActions);
} else {
warning = RuleExecutionStatusWarningReasons.MAX_EXECUTABLE_ACTIONS;
outcomeMsg.push(translations.taskRunner.warning.maxExecutableActions);
}
}
// Overwrite outcome to be error if last run reported any errors

View file

@ -30,6 +30,7 @@ const executionMetrics = {
numberOfRecoveredAlerts: 13,
hasReachedAlertLimit: false,
triggeredActionsStatus: ActionsCompletion.COMPLETE,
hasReachedQueuedActionsLimit: false,
};
describe('RuleExecutionStatus', () => {
@ -48,6 +49,7 @@ describe('RuleExecutionStatus', () => {
expect(received.numberOfNewAlerts).toEqual(expected.numberOfNewAlerts);
expect(received.hasReachedAlertLimit).toEqual(expected.hasReachedAlertLimit);
expect(received.triggeredActionsStatus).toEqual(expected.triggeredActionsStatus);
expect(received.hasReachedQueuedActionsLimit).toEqual(expected.hasReachedQueuedActionsLimit);
}
describe('executionStatusFromState()', () => {
@ -107,6 +109,30 @@ describe('RuleExecutionStatus', () => {
});
});
test('task state with max queued actions warning', () => {
const { status, metrics } = executionStatusFromState({
alertInstances: { a: {} },
metrics: {
...executionMetrics,
triggeredActionsStatus: ActionsCompletion.PARTIAL,
hasReachedQueuedActionsLimit: true,
},
});
checkDateIsNearNow(status.lastExecutionDate);
expect(status.warning).toEqual({
message: translations.taskRunner.warning.maxQueuedActions,
reason: RuleExecutionStatusWarningReasons.MAX_QUEUED_ACTIONS,
});
expect(status.status).toBe('warning');
expect(status.error).toBe(undefined);
testExpectedMetrics(metrics!, {
...executionMetrics,
triggeredActionsStatus: ActionsCompletion.PARTIAL,
hasReachedQueuedActionsLimit: true,
});
});
test('task state with max alerts warning', () => {
const { status, metrics } = executionStatusFromState({
alertInstances: { a: {} },

View file

@ -47,10 +47,17 @@ export function executionStatusFromState(
};
} else if (stateWithMetrics.metrics.triggeredActionsStatus === ActionsCompletion.PARTIAL) {
status = RuleExecutionStatusValues[5];
warning = {
reason: RuleExecutionStatusWarningReasons.MAX_EXECUTABLE_ACTIONS,
message: translations.taskRunner.warning.maxExecutableActions,
};
if (stateWithMetrics.metrics.hasReachedQueuedActionsLimit) {
warning = {
reason: RuleExecutionStatusWarningReasons.MAX_QUEUED_ACTIONS,
message: translations.taskRunner.warning.maxQueuedActions,
};
} else {
warning = {
reason: RuleExecutionStatusWarningReasons.MAX_EXECUTABLE_ACTIONS,
message: translations.taskRunner.warning.maxExecutableActions,
};
}
}
return {

View file

@ -25,6 +25,7 @@ describe('RuleRunMetricsStore', () => {
expect(ruleRunMetricsStore.getNumberOfNewAlerts()).toBe(0);
expect(ruleRunMetricsStore.getStatusByConnectorType('any')).toBe(undefined);
expect(ruleRunMetricsStore.getHasReachedAlertLimit()).toBe(false);
expect(ruleRunMetricsStore.getHasReachedQueuedActionsLimit()).toBe(false);
});
test('sets and returns numSearches', () => {
@ -95,6 +96,11 @@ describe('RuleRunMetricsStore', () => {
expect(metricsStore.getEsSearchDurationMs()).toEqual(555);
});
test('sets and returns hasReachedQueuedActionsLimit', () => {
ruleRunMetricsStore.setHasReachedQueuedActionsLimit(true);
expect(ruleRunMetricsStore.getHasReachedQueuedActionsLimit()).toBe(true);
});
test('gets metrics', () => {
expect(ruleRunMetricsStore.getMetrics()).toEqual({
triggeredActionsStatus: 'partial',
@ -107,6 +113,7 @@ describe('RuleRunMetricsStore', () => {
numberOfTriggeredActions: 5,
totalSearchDurationMs: 2,
hasReachedAlertLimit: true,
hasReachedQueuedActionsLimit: true,
});
});
@ -150,6 +157,19 @@ describe('RuleRunMetricsStore', () => {
).toBe(1);
});
// decrement
test('decrements numberOfTriggeredActions by 1', () => {
ruleRunMetricsStore.decrementNumberOfTriggeredActions();
expect(ruleRunMetricsStore.getNumberOfTriggeredActions()).toBe(5);
});
test('decrements numberOfTriggeredActionsByConnectorType by 1', () => {
ruleRunMetricsStore.decrementNumberOfTriggeredActionsByConnectorType(testConnectorId);
expect(
ruleRunMetricsStore.getStatusByConnectorType(testConnectorId).numberOfTriggeredActions
).toBe(0);
});
// Checker
test('checks if it has reached the executable actions limit', () => {
expect(ruleRunMetricsStore.hasReachedTheExecutableActionsLimit({ default: { max: 10 } })).toBe(

View file

@ -27,6 +27,7 @@ interface State {
numberOfGeneratedActions: number;
};
};
hasReachedQueuedActionsLimit: boolean;
}
export type RuleRunMetrics = Omit<State, 'connectorTypes'> & {
@ -44,6 +45,7 @@ export class RuleRunMetricsStore {
numberOfNewAlerts: 0,
hasReachedAlertLimit: false,
connectorTypes: {},
hasReachedQueuedActionsLimit: false,
};
// Getters
@ -90,6 +92,9 @@ export class RuleRunMetricsStore {
public getHasReachedAlertLimit = () => {
return this.state.hasReachedAlertLimit;
};
public getHasReachedQueuedActionsLimit = () => {
return this.state.hasReachedQueuedActionsLimit;
};
// Setters
public setSearchMetrics = (searchMetrics: SearchMetrics[]) => {
@ -135,6 +140,9 @@ export class RuleRunMetricsStore {
public setHasReachedAlertLimit = (hasReachedAlertLimit: boolean) => {
this.state.hasReachedAlertLimit = hasReachedAlertLimit;
};
public setHasReachedQueuedActionsLimit = (hasReachedQueuedActionsLimit: boolean) => {
this.state.hasReachedQueuedActionsLimit = hasReachedQueuedActionsLimit;
};
// Checkers
public hasReachedTheExecutableActionsLimit = (actionsConfigMap: ActionsConfigMap): boolean =>
@ -182,4 +190,13 @@ export class RuleRunMetricsStore {
const currentVal = this.state.connectorTypes[actionTypeId]?.numberOfGeneratedActions || 0;
set(this.state, `connectorTypes["${actionTypeId}"].numberOfGeneratedActions`, currentVal + 1);
};
// Decrementer
public decrementNumberOfTriggeredActions = () => {
this.state.numberOfTriggeredActions--;
};
public decrementNumberOfTriggeredActionsByConnectorType = (actionTypeId: string) => {
const currentVal = this.state.connectorTypes[actionTypeId]?.numberOfTriggeredActions || 0;
set(this.state, `connectorTypes["${actionTypeId}"].numberOfTriggeredActions`, currentVal - 1);
};
}

View file

@ -10,6 +10,7 @@ import { schema } from '@kbn/config-schema';
const executionStatusWarningReason = schema.oneOf([
schema.literal('maxExecutableActions'),
schema.literal('maxAlerts'),
schema.literal('maxQueuedActions'),
]);
const executionStatusErrorReason = schema.oneOf([

View file

@ -34,6 +34,7 @@ import sinon from 'sinon';
import { mockAAD } from './fixtures';
import { schema } from '@kbn/config-schema';
import { alertsClientMock } from '../alerts_client/alerts_client.mock';
import { ExecutionResponseType } from '@kbn/actions-plugin/server/create_execute_function';
jest.mock('./inject_action_params', () => ({
injectActionParams: jest.fn(),
@ -137,6 +138,11 @@ const defaultExecutionParams = {
alertsClient,
};
const defaultExecutionResponse = {
errors: false,
items: [{ actionTypeId: 'test', id: '1', response: ExecutionResponseType.SUCCESS }],
};
let ruleRunMetricsStore: RuleRunMetricsStore;
let clock: sinon.SinonFakeTimers;
type ActiveActionGroup = 'default' | 'other-group';
@ -223,6 +229,7 @@ describe('Execution Handler', () => {
renderActionParameterTemplatesDefault
);
ruleRunMetricsStore = new RuleRunMetricsStore();
actionsClient.bulkEnqueueExecution.mockResolvedValue(defaultExecutionResponse);
});
beforeAll(() => {
clock = sinon.useFakeTimers();
@ -238,39 +245,40 @@ describe('Execution Handler', () => {
expect(ruleRunMetricsStore.getNumberOfGeneratedActions()).toBe(1);
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(1);
expect(actionsClient.bulkEnqueueExecution.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Array [
Array [
Array [
Object {
"actionTypeId": "test",
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"alertVal": "My 1 name-of-alert test1 tag-A,tag-B 1 goes here",
"contextVal": "My goes here",
"foo": true,
"stateVal": "My goes here",
},
"relatedSavedObjects": Array [
Object {
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"alertVal": "My 1 name-of-alert test1 tag-A,tag-B 1 goes here",
"contextVal": "My goes here",
"foo": true,
"stateVal": "My goes here",
},
"relatedSavedObjects": Array [
Object {
"id": "1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
"source": Object {
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"spaceId": "test1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
]
`);
"source": Object {
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"spaceId": "test1",
},
],
]
`);
expect(alertingEventLogger.logAction).toHaveBeenCalledTimes(1);
expect(alertingEventLogger.logAction).toHaveBeenNthCalledWith(1, {
@ -334,6 +342,7 @@ describe('Execution Handler', () => {
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(1);
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledWith([
{
actionTypeId: 'test2',
consumer: 'rule-consumer',
id: '2',
params: {
@ -423,39 +432,40 @@ describe('Execution Handler', () => {
expect(ruleRunMetricsStore.getNumberOfGeneratedActions()).toBe(1);
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(1);
expect(actionsClient.bulkEnqueueExecution.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Array [
Array [
Array [
Object {
"actionTypeId": "test",
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"alertVal": "My 1 name-of-alert test1 tag-A,tag-B 2 goes here",
"contextVal": "My context-val goes here",
"foo": true,
"stateVal": "My goes here",
},
"relatedSavedObjects": Array [
Object {
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"alertVal": "My 1 name-of-alert test1 tag-A,tag-B 2 goes here",
"contextVal": "My context-val goes here",
"foo": true,
"stateVal": "My goes here",
},
"relatedSavedObjects": Array [
Object {
"id": "1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
"source": Object {
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"spaceId": "test1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
]
`);
"source": Object {
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"spaceId": "test1",
},
],
]
`);
});
test('state attribute gets parameterized', async () => {
@ -463,39 +473,40 @@ describe('Execution Handler', () => {
await executionHandler.run(generateAlert({ id: 2, state: { value: 'state-val' } }));
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(1);
expect(actionsClient.bulkEnqueueExecution.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Array [
Array [
Array [
Object {
"actionTypeId": "test",
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"alertVal": "My 1 name-of-alert test1 tag-A,tag-B 2 goes here",
"contextVal": "My goes here",
"foo": true,
"stateVal": "My state-val goes here",
},
"relatedSavedObjects": Array [
Object {
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"alertVal": "My 1 name-of-alert test1 tag-A,tag-B 2 goes here",
"contextVal": "My goes here",
"foo": true,
"stateVal": "My state-val goes here",
},
"relatedSavedObjects": Array [
Object {
"id": "1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
"source": Object {
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"spaceId": "test1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
]
`);
"source": Object {
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"spaceId": "test1",
},
],
]
`);
});
test(`logs an error when action group isn't part of actionGroups available for the ruleType`, async () => {
@ -514,6 +525,21 @@ describe('Execution Handler', () => {
});
test('Stops triggering actions when the number of total triggered actions is reached the number of max executable actions', async () => {
actionsClient.bulkEnqueueExecution.mockResolvedValueOnce({
errors: false,
items: [
{
actionTypeId: 'test2',
id: '1',
response: ExecutionResponseType.SUCCESS,
},
{
actionTypeId: 'test2',
id: '2',
response: ExecutionResponseType.SUCCESS,
},
],
});
const actions = [
{
id: '1',
@ -573,6 +599,27 @@ describe('Execution Handler', () => {
});
test('Skips triggering actions for a specific action type when it reaches the limit for that specific action type', async () => {
actionsClient.bulkEnqueueExecution.mockResolvedValueOnce({
errors: false,
items: [
{ actionTypeId: 'test', id: '1', response: ExecutionResponseType.SUCCESS },
{
actionTypeId: 'test-action-type-id',
id: '2',
response: ExecutionResponseType.SUCCESS,
},
{
actionTypeId: 'another-action-type-id',
id: '4',
response: ExecutionResponseType.SUCCESS,
},
{
actionTypeId: 'another-action-type-id',
id: '5',
response: ExecutionResponseType.SUCCESS,
},
],
});
const actions = [
...defaultExecutionParams.rule.actions,
{
@ -652,6 +699,77 @@ describe('Execution Handler', () => {
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(1);
});
test('Stops triggering actions when the number of total queued actions is reached the number of max queued actions', async () => {
actionsClient.bulkEnqueueExecution.mockResolvedValueOnce({
errors: true,
items: [
{
actionTypeId: 'test',
id: '1',
response: ExecutionResponseType.SUCCESS,
},
{
actionTypeId: 'test',
id: '2',
response: ExecutionResponseType.SUCCESS,
},
{
actionTypeId: 'test',
id: '3',
response: ExecutionResponseType.QUEUED_ACTIONS_LIMIT_ERROR,
},
],
});
const actions = [
{
id: '1',
group: 'default',
actionTypeId: 'test',
params: {
foo: true,
contextVal: 'My other {{context.value}} goes here',
stateVal: 'My other {{state.value}} goes here',
},
},
{
id: '2',
group: 'default',
actionTypeId: 'test',
params: {
foo: true,
contextVal: 'My other {{context.value}} goes here',
stateVal: 'My other {{state.value}} goes here',
},
},
{
id: '3',
group: 'default',
actionTypeId: 'test',
params: {
foo: true,
contextVal: '{{context.value}} goes here',
stateVal: '{{state.value}} goes here',
},
},
];
const executionHandler = new ExecutionHandler(
generateExecutionParams({
...defaultExecutionParams,
rule: {
...defaultExecutionParams.rule,
actions,
},
})
);
await executionHandler.run(generateAlert({ id: 2, state: { value: 'state-val' } }));
expect(ruleRunMetricsStore.getNumberOfTriggeredActions()).toBe(2);
expect(ruleRunMetricsStore.getNumberOfGeneratedActions()).toBe(3);
expect(ruleRunMetricsStore.getTriggeredActionsStatus()).toBe(ActionsCompletion.PARTIAL);
expect(defaultExecutionParams.logger.debug).toHaveBeenCalledTimes(1);
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(1);
});
test('schedules alerts with recovered actions', async () => {
const actions = [
{
@ -680,39 +798,40 @@ describe('Execution Handler', () => {
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(1);
expect(actionsClient.bulkEnqueueExecution.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Array [
Array [
Array [
Object {
"actionTypeId": "test",
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"alertVal": "My 1 name-of-alert test1 tag-A,tag-B 1 goes here",
"contextVal": "My goes here",
"foo": true,
"stateVal": "My goes here",
},
"relatedSavedObjects": Array [
Object {
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"alertVal": "My 1 name-of-alert test1 tag-A,tag-B 1 goes here",
"contextVal": "My goes here",
"foo": true,
"stateVal": "My goes here",
},
"relatedSavedObjects": Array [
Object {
"id": "1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
"source": Object {
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"spaceId": "test1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
]
`);
"source": Object {
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"spaceId": "test1",
},
],
]
`);
});
test('does not schedule alerts with recovered actions that are muted', async () => {
@ -852,6 +971,16 @@ describe('Execution Handler', () => {
});
test('triggers summary actions (per rule run)', async () => {
actionsClient.bulkEnqueueExecution.mockResolvedValueOnce({
errors: false,
items: [
{
actionTypeId: 'testActionTypeId',
id: '1',
response: ExecutionResponseType.SUCCESS,
},
],
});
alertsClient.getSummarizedAlerts.mockResolvedValue({
new: {
count: 1,
@ -895,36 +1024,37 @@ describe('Execution Handler', () => {
});
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(1);
expect(actionsClient.bulkEnqueueExecution.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Array [
Array [
Array [
Object {
"actionTypeId": "testActionTypeId",
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"message": "New: 1 Ongoing: 0 Recovered: 0",
},
"relatedSavedObjects": Array [
Object {
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"message": "New: 1 Ongoing: 0 Recovered: 0",
},
"relatedSavedObjects": Array [
Object {
"id": "1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
"source": Object {
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"spaceId": "test1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
]
`);
"source": Object {
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"spaceId": "test1",
},
],
]
`);
expect(alertingEventLogger.logAction).toBeCalledWith({
alertSummary: { new: 1, ongoing: 0, recovered: 0 },
id: '1',
@ -970,6 +1100,16 @@ describe('Execution Handler', () => {
});
test('triggers summary actions (custom interval)', async () => {
actionsClient.bulkEnqueueExecution.mockResolvedValueOnce({
errors: false,
items: [
{
actionTypeId: 'testActionTypeId',
id: '1',
response: ExecutionResponseType.SUCCESS,
},
],
});
alertsClient.getSummarizedAlerts.mockResolvedValue({
new: {
count: 1,
@ -1022,36 +1162,37 @@ describe('Execution Handler', () => {
});
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(1);
expect(actionsClient.bulkEnqueueExecution.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Array [
Array [
Array [
Object {
"actionTypeId": "testActionTypeId",
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"message": "New: 1 Ongoing: 0 Recovered: 0",
},
"relatedSavedObjects": Array [
Object {
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"message": "New: 1 Ongoing: 0 Recovered: 0",
},
"relatedSavedObjects": Array [
Object {
"id": "1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
"source": Object {
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"spaceId": "test1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
]
`);
"source": Object {
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"spaceId": "test1",
},
],
]
`);
expect(alertingEventLogger.logAction).toBeCalledWith({
alertSummary: { new: 1, ongoing: 0, recovered: 0 },
id: '1',
@ -1206,6 +1347,17 @@ describe('Execution Handler', () => {
});
test('schedules alerts with multiple recovered actions', async () => {
actionsClient.bulkEnqueueExecution.mockResolvedValueOnce({
errors: false,
items: [
{ actionTypeId: 'test', id: '1', response: ExecutionResponseType.SUCCESS },
{
actionTypeId: 'test',
id: '2',
response: ExecutionResponseType.SUCCESS,
},
],
});
const actions = [
{
id: '1',
@ -1245,70 +1397,82 @@ describe('Execution Handler', () => {
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledTimes(1);
expect(actionsClient.bulkEnqueueExecution.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Array [
Array [
Array [
Object {
"actionTypeId": "test",
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"alertVal": "My 1 name-of-alert test1 tag-A,tag-B 1 goes here",
"contextVal": "My goes here",
"foo": true,
"stateVal": "My goes here",
},
"relatedSavedObjects": Array [
Object {
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "1",
"params": Object {
"alertVal": "My 1 name-of-alert test1 tag-A,tag-B 1 goes here",
"contextVal": "My goes here",
"foo": true,
"stateVal": "My goes here",
},
"relatedSavedObjects": Array [
Object {
"id": "1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
"source": Object {
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"spaceId": "test1",
},
Object {
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "2",
"params": Object {
"alertVal": "My 1 name-of-alert test1 tag-A,tag-B 1 goes here",
"contextVal": "My goes here",
"foo": true,
"stateVal": "My goes here",
},
"relatedSavedObjects": Array [
Object {
"id": "1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
"source": Object {
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"spaceId": "test1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
]
`);
"source": Object {
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"spaceId": "test1",
},
Object {
"actionTypeId": "test",
"apiKey": "MTIzOmFiYw==",
"consumer": "rule-consumer",
"executionId": "5f6aa57d-3e22-484e-bae8-cbed868f4d28",
"id": "2",
"params": Object {
"alertVal": "My 1 name-of-alert test1 tag-A,tag-B 1 goes here",
"contextVal": "My goes here",
"foo": true,
"stateVal": "My goes here",
},
"relatedSavedObjects": Array [
Object {
"id": "1",
"namespace": "test1",
"type": "alert",
"typeId": "test",
},
],
"source": Object {
"source": Object {
"id": "1",
"type": "alert",
},
"type": "SAVED_OBJECT",
},
"spaceId": "test1",
},
],
]
`);
});
test('does not schedule actions for the summarized alerts that are filtered out (for each alert)', async () => {
actionsClient.bulkEnqueueExecution.mockResolvedValueOnce({
errors: false,
items: [
{
actionTypeId: 'testActionTypeId',
id: '1',
response: ExecutionResponseType.SUCCESS,
},
],
});
alertsClient.getSummarizedAlerts.mockResolvedValue({
new: {
count: 0,
@ -1372,6 +1536,16 @@ describe('Execution Handler', () => {
});
test('does not schedule actions for the summarized alerts that are filtered out (summary of alerts onThrottleInterval)', async () => {
actionsClient.bulkEnqueueExecution.mockResolvedValueOnce({
errors: false,
items: [
{
actionTypeId: 'testActionTypeId',
id: '1',
response: ExecutionResponseType.SUCCESS,
},
],
});
alertsClient.getSummarizedAlerts.mockResolvedValue({
new: {
count: 0,
@ -1432,6 +1606,16 @@ describe('Execution Handler', () => {
});
test('does not schedule actions for the for-each type alerts that are filtered out', async () => {
actionsClient.bulkEnqueueExecution.mockResolvedValueOnce({
errors: false,
items: [
{
actionTypeId: 'testActionTypeId',
id: '1',
response: ExecutionResponseType.SUCCESS,
},
],
});
alertsClient.getSummarizedAlerts.mockResolvedValue({
new: {
count: 1,
@ -1486,6 +1670,7 @@ describe('Execution Handler', () => {
});
expect(actionsClient.bulkEnqueueExecution).toHaveBeenCalledWith([
{
actionTypeId: 'testActionTypeId',
apiKey: 'MTIzOmFiYw==',
consumer: 'rule-consumer',
executionId: '5f6aa57d-3e22-484e-bae8-cbed868f4d28',

View file

@ -10,7 +10,11 @@ import { Logger } from '@kbn/core/server';
import { getRuleDetailsRoute, triggersActionsRoute } from '@kbn/rule-data-utils';
import { asSavedObjectExecutionSource } from '@kbn/actions-plugin/server';
import { isEphemeralTaskRejectedDueToCapacityError } from '@kbn/task-manager-plugin/server';
import { ExecuteOptions as EnqueueExecutionOptions } from '@kbn/actions-plugin/server/create_execute_function';
import {
ExecuteOptions as EnqueueExecutionOptions,
ExecutionResponseItem,
ExecutionResponseType,
} from '@kbn/actions-plugin/server/create_execute_function';
import { ActionsCompletion } from '@kbn/alerting-state-types';
import { ActionsClient } from '@kbn/actions-plugin/server/actions_client';
import { chunk } from 'lodash';
@ -49,6 +53,18 @@ enum Reasons {
ACTION_GROUP_NOT_CHANGED = 'actionGroupHasNotChanged',
}
interface LogAction {
id: string;
typeId: string;
alertId?: string;
alertGroup?: string;
alertSummary?: {
new: number;
ongoing: number;
recovered: number;
};
}
export interface RunResult {
throttledSummaryActions: ThrottledActions;
}
@ -176,8 +192,9 @@ export class ExecutionHandler<
},
} = this;
const logActions = [];
const logActions: Record<string, LogAction> = {};
const bulkActions: EnqueueExecutionOptions[] = [];
let bulkActionsResponse: ExecutionResponseItem[] = [];
this.ruleRunMetricsStore.incrementNumberOfGeneratedActions(executables.length);
@ -262,7 +279,7 @@ export class ExecutionHandler<
throttledSummaryActions[action.uuid!] = { date: new Date().toISOString() };
}
logActions.push({
logActions[action.id] = {
id: action.id,
typeId: action.actionTypeId,
alertSummary: {
@ -270,7 +287,7 @@ export class ExecutionHandler<
ongoing: summarizedAlerts.ongoing.count,
recovered: summarizedAlerts.recovered.count,
},
});
};
} else {
const ruleUrl = this.buildRuleUrl(spaceId);
const actionToRun = {
@ -307,12 +324,12 @@ export class ExecutionHandler<
bulkActions,
});
logActions.push({
logActions[action.id] = {
id: action.id,
typeId: action.actionTypeId,
alertId: alert.getId(),
alertGroup: action.group,
});
};
if (!this.isRecoveredAlert(actionGroup)) {
if (isActionOnInterval(action)) {
@ -331,12 +348,40 @@ export class ExecutionHandler<
if (!!bulkActions.length) {
for (const c of chunk(bulkActions, CHUNK_SIZE)) {
await this.actionsClient!.bulkEnqueueExecution(c);
const response = await this.actionsClient!.bulkEnqueueExecution(c);
if (response.errors) {
bulkActionsResponse = bulkActionsResponse.concat(
response.items.filter(
(i) => i.response === ExecutionResponseType.QUEUED_ACTIONS_LIMIT_ERROR
)
);
}
}
}
if (!!logActions.length) {
for (const action of logActions) {
if (!!bulkActionsResponse.length) {
for (const r of bulkActionsResponse) {
if (r.response === ExecutionResponseType.QUEUED_ACTIONS_LIMIT_ERROR) {
ruleRunMetricsStore.setHasReachedQueuedActionsLimit(true);
ruleRunMetricsStore.decrementNumberOfTriggeredActions();
ruleRunMetricsStore.decrementNumberOfTriggeredActionsByConnectorType(r.actionTypeId);
ruleRunMetricsStore.setTriggeredActionsStatusByConnectorType({
actionTypeId: r.actionTypeId,
status: ActionsCompletion.PARTIAL,
});
logger.debug(
`Rule "${this.rule.id}" skipped scheduling action "${r.id}" because the maximum number of queued actions has been reached.`
);
delete logActions[r.id];
}
}
}
const logActionsValues = Object.values(logActions);
if (!!logActionsValues.length) {
for (const action of logActionsValues) {
alertingEventLogger.logAction(action);
}
}
@ -509,6 +554,7 @@ export class ExecutionHandler<
typeId: this.ruleType.id,
},
],
actionTypeId: action.actionTypeId,
};
}

View file

@ -395,13 +395,16 @@ export const generateEnqueueFunctionInput = ({
isBulk = false,
isResolved,
foo,
actionTypeId,
}: {
id: string;
isBulk?: boolean;
isResolved?: boolean;
foo?: boolean;
actionTypeId?: string;
}) => {
const input = {
actionTypeId: actionTypeId || 'action',
apiKey: 'MTIzOmFiYw==',
executionId: '5f6aa57d-3e22-484e-bae8-cbed868f4d28',
id,

View file

@ -237,6 +237,8 @@ describe('Task Runner', () => {
logger.get.mockImplementation(() => logger);
ruleType.executor.mockResolvedValue({ state: {} });
actionsClient.bulkEnqueueExecution.mockResolvedValue({ errors: false, items: [] });
});
test('successfully executes the task', async () => {
@ -299,7 +301,7 @@ describe('Task Runner', () => {
);
expect(logger.debug).nthCalledWith(
4,
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":0,"numberOfGeneratedActions":0,"numberOfActiveAlerts":0,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":0,"hasReachedAlertLimit":false,"triggeredActionsStatus":"complete"}'
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":0,"numberOfGeneratedActions":0,"numberOfActiveAlerts":0,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":0,"hasReachedAlertLimit":false,"hasReachedQueuedActionsLimit":false,"triggeredActionsStatus":"complete"}'
);
testAlertingEventLogCalls({ status: 'ok' });
@ -381,7 +383,7 @@ describe('Task Runner', () => {
);
expect(logger.debug).nthCalledWith(
5,
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":1,"numberOfGeneratedActions":1,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":1,"hasReachedAlertLimit":false,"triggeredActionsStatus":"complete"}'
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":1,"numberOfGeneratedActions":1,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":1,"hasReachedAlertLimit":false,"hasReachedQueuedActionsLimit":false,"triggeredActionsStatus":"complete"}'
);
testAlertingEventLogCalls({
@ -469,7 +471,7 @@ describe('Task Runner', () => {
);
expect(logger.debug).nthCalledWith(
6,
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":0,"numberOfGeneratedActions":0,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":1,"hasReachedAlertLimit":false,"triggeredActionsStatus":"complete"}'
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":0,"numberOfGeneratedActions":0,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":1,"hasReachedAlertLimit":false,"hasReachedQueuedActionsLimit":false,"triggeredActionsStatus":"complete"}'
);
testAlertingEventLogCalls({
@ -723,7 +725,7 @@ describe('Task Runner', () => {
);
expect(logger.debug).nthCalledWith(
6,
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":1,"numberOfGeneratedActions":1,"numberOfActiveAlerts":2,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":2,"hasReachedAlertLimit":false,"triggeredActionsStatus":"complete"}'
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":1,"numberOfGeneratedActions":1,"numberOfActiveAlerts":2,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":2,"hasReachedAlertLimit":false,"hasReachedQueuedActionsLimit":false,"triggeredActionsStatus":"complete"}'
);
expect(mockUsageCounter.incrementCounter).not.toHaveBeenCalled();
}
@ -1168,7 +1170,7 @@ describe('Task Runner', () => {
);
expect(logger.debug).nthCalledWith(
6,
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":2,"numberOfGeneratedActions":2,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":1,"numberOfNewAlerts":0,"hasReachedAlertLimit":false,"triggeredActionsStatus":"complete"}'
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":2,"numberOfGeneratedActions":2,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":1,"numberOfNewAlerts":0,"hasReachedAlertLimit":false,"hasReachedQueuedActionsLimit":false,"triggeredActionsStatus":"complete"}'
);
testAlertingEventLogCalls({
@ -1295,7 +1297,7 @@ describe('Task Runner', () => {
);
expect(logger.debug).nthCalledWith(
6,
`ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":2,"numberOfGeneratedActions":2,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":1,"numberOfNewAlerts":0,"hasReachedAlertLimit":false,"triggeredActionsStatus":"complete"}`
`ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":2,"numberOfGeneratedActions":2,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":1,"numberOfNewAlerts":0,"hasReachedAlertLimit":false,"hasReachedQueuedActionsLimit":false,"triggeredActionsStatus":"complete"}`
);
testAlertingEventLogCalls({
@ -1490,7 +1492,7 @@ describe('Task Runner', () => {
expect(enqueueFunction).toHaveBeenCalledTimes(1);
expect(enqueueFunction).toHaveBeenCalledWith(
generateEnqueueFunctionInput({ isBulk, id: '1', foo: true })
generateEnqueueFunctionInput({ isBulk, id: '1', foo: true, actionTypeId: 'slack' })
);
}
);
@ -1562,7 +1564,7 @@ describe('Task Runner', () => {
expect(enqueueFunction).toHaveBeenCalledTimes(1);
expect(enqueueFunction).toHaveBeenCalledWith(
generateEnqueueFunctionInput({ isBulk, id: '1', foo: true })
generateEnqueueFunctionInput({ isBulk, id: '1', foo: true, actionTypeId: 'slack' })
);
expect(result.state.summaryActions).toEqual({
'111-111': { date: new Date(DATE_1970).toISOString() },
@ -2440,7 +2442,7 @@ describe('Task Runner', () => {
);
expect(logger.debug).nthCalledWith(
4,
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":0,"numberOfGeneratedActions":0,"numberOfActiveAlerts":0,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":0,"hasReachedAlertLimit":false,"triggeredActionsStatus":"complete"}'
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":0,"numberOfGeneratedActions":0,"numberOfActiveAlerts":0,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":0,"hasReachedAlertLimit":false,"hasReachedQueuedActionsLimit":false,"triggeredActionsStatus":"complete"}'
);
testAlertingEventLogCalls({
@ -2962,7 +2964,7 @@ describe('Task Runner', () => {
status: 'warning',
errorReason: `maxExecutableActions`,
logAlert: 4,
logAction: 5,
logAction: 3,
});
});
@ -3146,6 +3148,7 @@ describe('Task Runner', () => {
logAlert = 0,
logAction = 0,
hasReachedAlertLimit = false,
hasReachedQueuedActionsLimit = false,
}: {
status: string;
ruleContext?: RuleContextOpts;
@ -3162,6 +3165,7 @@ describe('Task Runner', () => {
errorReason?: string;
errorMessage?: string;
hasReachedAlertLimit?: boolean;
hasReachedQueuedActionsLimit?: boolean;
}) {
expect(alertingEventLogger.initialize).toHaveBeenCalledWith(ruleContext);
if (status !== 'skip') {
@ -3215,6 +3219,7 @@ describe('Task Runner', () => {
totalSearchDurationMs: 23423,
hasReachedAlertLimit,
triggeredActionsStatus: 'partial',
hasReachedQueuedActionsLimit,
},
status: {
lastExecutionDate: new Date('1970-01-01T00:00:00.000Z'),
@ -3250,6 +3255,7 @@ describe('Task Runner', () => {
totalSearchDurationMs: 23423,
hasReachedAlertLimit,
triggeredActionsStatus: 'complete',
hasReachedQueuedActionsLimit,
},
status: {
lastExecutionDate: new Date('1970-01-01T00:00:00.000Z'),

View file

@ -409,7 +409,7 @@ describe('Task Runner', () => {
);
expect(logger.debug).nthCalledWith(
5,
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":0,"numberOfGeneratedActions":0,"numberOfActiveAlerts":0,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":0,"hasReachedAlertLimit":false,"triggeredActionsStatus":"complete"}'
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":0,"numberOfGeneratedActions":0,"numberOfActiveAlerts":0,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":0,"hasReachedAlertLimit":false,"hasReachedQueuedActionsLimit":false,"triggeredActionsStatus":"complete"}'
);
expect(
taskRunnerFactoryInitializerParams.internalSavedObjectsRepository.update

View file

@ -191,6 +191,8 @@ describe('Task Runner Cancel', () => {
alertingEventLogger.getStartAndDuration.mockImplementation(() => ({ start: new Date() }));
(AlertingEventLogger as jest.Mock).mockImplementation(() => alertingEventLogger);
logger.get.mockImplementation(() => logger);
actionsClient.bulkEnqueueExecution.mockResolvedValue({ errors: false, items: [] });
});
test('updates rule saved object execution status and writes to event log entry when task is cancelled mid-execution', async () => {
@ -470,7 +472,7 @@ describe('Task Runner Cancel', () => {
);
expect(logger.debug).nthCalledWith(
8,
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":1,"numberOfGeneratedActions":1,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":1,"hasReachedAlertLimit":false,"triggeredActionsStatus":"complete"}'
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":1,"numberOfGeneratedActions":1,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":1,"hasReachedAlertLimit":false,"hasReachedQueuedActionsLimit":false,"triggeredActionsStatus":"complete"}'
);
}
@ -485,6 +487,7 @@ describe('Task Runner Cancel', () => {
logAlert = 0,
logAction = 0,
hasReachedAlertLimit = false,
hasReachedQueuedActionsLimit = false,
}: {
status: string;
ruleContext?: RuleContextOpts;
@ -497,6 +500,7 @@ describe('Task Runner Cancel', () => {
logAlert?: number;
logAction?: number;
hasReachedAlertLimit?: boolean;
hasReachedQueuedActionsLimit?: boolean;
}) {
expect(alertingEventLogger.initialize).toHaveBeenCalledWith(ruleContext);
expect(alertingEventLogger.start).toHaveBeenCalled();
@ -515,6 +519,7 @@ describe('Task Runner Cancel', () => {
totalSearchDurationMs: 23423,
hasReachedAlertLimit,
triggeredActionsStatus: 'complete',
hasReachedQueuedActionsLimit,
},
status: {
lastExecutionDate: new Date('1970-01-01T00:00:00.000Z'),

View file

@ -5,18 +5,35 @@
* 2.0.
*/
import { loggerMock } from '@kbn/logging-mocks';
import { unsecuredActionsClientMock } from '@kbn/actions-plugin/server/unsecured_actions_client/unsecured_actions_client.mock';
import { ConnectorsEmailService } from './connectors_email_service';
import type { PlainTextEmail, HTMLEmail } from './types';
import { ExecutionResponseType } from '@kbn/actions-plugin/server/create_execute_function';
const REQUESTER_ID = 'requesterId';
const CONNECTOR_ID = 'connectorId';
describe('sendPlainTextEmail()', () => {
const logger = loggerMock.create();
beforeEach(() => {
loggerMock.clear(logger);
});
describe('calls the provided ActionsClient#bulkEnqueueExecution() with the appropriate params', () => {
it(`omits the 'relatedSavedObjects' field if no context is provided`, () => {
const actionsClient = unsecuredActionsClientMock.create();
const email = new ConnectorsEmailService(REQUESTER_ID, CONNECTOR_ID, actionsClient);
actionsClient.bulkEnqueueExecution.mockResolvedValueOnce({
errors: false,
items: [
{
id: CONNECTOR_ID,
response: ExecutionResponseType.SUCCESS,
actionTypeId: 'test',
},
],
});
const email = new ConnectorsEmailService(REQUESTER_ID, CONNECTOR_ID, actionsClient, logger);
const payload: PlainTextEmail = {
to: ['user1@email.com'],
subject: 'This is a notification email',
@ -40,7 +57,17 @@ describe('sendPlainTextEmail()', () => {
it(`populates the 'relatedSavedObjects' field if context is provided`, () => {
const actionsClient = unsecuredActionsClientMock.create();
const email = new ConnectorsEmailService(REQUESTER_ID, CONNECTOR_ID, actionsClient);
actionsClient.bulkEnqueueExecution.mockResolvedValueOnce({
errors: false,
items: [
{
id: CONNECTOR_ID,
response: ExecutionResponseType.SUCCESS,
actionTypeId: 'test',
},
],
});
const email = new ConnectorsEmailService(REQUESTER_ID, CONNECTOR_ID, actionsClient, logger);
const payload: PlainTextEmail = {
to: ['user1@email.com', 'user2@email.com', 'user3@email.com'],
subject: 'This is a notification email',
@ -107,14 +134,53 @@ describe('sendPlainTextEmail()', () => {
},
]);
});
it(`logs an error when the maximum number of queued actions has been reached`, async () => {
const actionsClient = unsecuredActionsClientMock.create();
actionsClient.bulkEnqueueExecution.mockResolvedValueOnce({
errors: true,
items: [
{
id: CONNECTOR_ID,
response: ExecutionResponseType.QUEUED_ACTIONS_LIMIT_ERROR,
actionTypeId: 'test',
},
],
});
const email = new ConnectorsEmailService(REQUESTER_ID, CONNECTOR_ID, actionsClient, logger);
const payload: PlainTextEmail = {
to: ['user1@email.com'],
subject: 'This is a notification email',
message: 'With some contents inside.',
};
await email.sendPlainTextEmail(payload);
expect(logger.warn).toHaveBeenCalled();
});
});
});
describe('sendHTMLEmail()', () => {
const logger = loggerMock.create();
beforeEach(() => {
loggerMock.clear(logger);
});
describe('calls the provided ActionsClient#bulkEnqueueExecution() with the appropriate params', () => {
it(`omits the 'relatedSavedObjects' field if no context is provided`, () => {
const actionsClient = unsecuredActionsClientMock.create();
const email = new ConnectorsEmailService(REQUESTER_ID, CONNECTOR_ID, actionsClient);
actionsClient.bulkEnqueueExecution.mockResolvedValueOnce({
errors: false,
items: [
{
id: CONNECTOR_ID,
response: ExecutionResponseType.SUCCESS,
actionTypeId: 'test',
},
],
});
const email = new ConnectorsEmailService(REQUESTER_ID, CONNECTOR_ID, actionsClient, logger);
const payload: HTMLEmail = {
to: ['user1@email.com'],
subject: 'This is a notification email',
@ -140,7 +206,17 @@ describe('sendHTMLEmail()', () => {
it(`populates the 'relatedSavedObjects' field if context is provided`, () => {
const actionsClient = unsecuredActionsClientMock.create();
const email = new ConnectorsEmailService(REQUESTER_ID, CONNECTOR_ID, actionsClient);
actionsClient.bulkEnqueueExecution.mockResolvedValueOnce({
errors: false,
items: [
{
id: CONNECTOR_ID,
response: ExecutionResponseType.SUCCESS,
actionTypeId: 'test',
},
],
});
const email = new ConnectorsEmailService(REQUESTER_ID, CONNECTOR_ID, actionsClient, logger);
const payload: HTMLEmail = {
to: ['user1@email.com', 'user2@email.com', 'user3@email.com'],
subject: 'This is a notification email',
@ -211,5 +287,29 @@ describe('sendHTMLEmail()', () => {
},
]);
});
it(`logs an error when the maximum number of queued actions has been reached`, async () => {
const actionsClient = unsecuredActionsClientMock.create();
actionsClient.bulkEnqueueExecution.mockResolvedValueOnce({
errors: true,
items: [
{
id: CONNECTOR_ID,
response: ExecutionResponseType.QUEUED_ACTIONS_LIMIT_ERROR,
actionTypeId: 'test',
},
],
});
const email = new ConnectorsEmailService(REQUESTER_ID, CONNECTOR_ID, actionsClient, logger);
const payload: HTMLEmail = {
to: ['user1@email.com'],
subject: 'This is a notification email',
message: 'With some contents inside.',
messageHTML: '<html><body><span>With some contents inside.</span></body></html>',
};
await email.sendHTMLEmail(payload);
expect(logger.warn).toHaveBeenCalled();
});
});
});

View file

@ -6,13 +6,19 @@
*/
import type { IUnsecuredActionsClient } from '@kbn/actions-plugin/server';
import {
ExecutionResponseItem,
ExecutionResponseType,
} from '@kbn/actions-plugin/server/create_execute_function';
import type { Logger } from '@kbn/core/server';
import type { EmailService, PlainTextEmail, HTMLEmail } from './types';
export class ConnectorsEmailService implements EmailService {
constructor(
private requesterId: string,
private connectorId: string,
private actionsClient: IUnsecuredActionsClient
private actionsClient: IUnsecuredActionsClient,
private logger: Logger
) {}
async sendPlainTextEmail(params: PlainTextEmail): Promise<void> {
@ -25,7 +31,11 @@ export class ConnectorsEmailService implements EmailService {
},
relatedSavedObjects: params.context?.relatedObjects,
}));
return await this.actionsClient.bulkEnqueueExecution(this.requesterId, actions);
const response = await this.actionsClient.bulkEnqueueExecution(this.requesterId, actions);
if (response.errors) {
this.logEnqueueExecutionResponse(response.items);
}
}
async sendHTMLEmail(params: HTMLEmail): Promise<void> {
@ -40,6 +50,19 @@ export class ConnectorsEmailService implements EmailService {
relatedSavedObjects: params.context?.relatedObjects,
}));
return await this.actionsClient.bulkEnqueueExecution(this.requesterId, actions);
const response = await this.actionsClient.bulkEnqueueExecution(this.requesterId, actions);
if (response.errors) {
this.logEnqueueExecutionResponse(response.items);
}
}
private logEnqueueExecutionResponse(items: ExecutionResponseItem[]) {
for (const r of items) {
if (r.response === ExecutionResponseType.QUEUED_ACTIONS_LIMIT_ERROR) {
this.logger.warn(
`Skipped scheduling action "${r.id}" because the maximum number of queued actions has been reached.`
);
}
}
}
}

View file

@ -235,7 +235,8 @@ describe('ConnectorsEmailServiceProvider', () => {
expect(connectorsEmailServiceMock).toHaveBeenCalledWith(
PLUGIN_ID,
validConnectorConfig.connectors.default.email,
actionsStart.getUnsecuredActionsClient()
actionsStart.getUnsecuredActionsClient(),
logger
);
});
});

View file

@ -71,7 +71,12 @@ export class EmailServiceProvider
try {
const unsecuredActionsClient = actions.getUnsecuredActionsClient();
email = new LicensedEmailService(
new ConnectorsEmailService(PLUGIN_ID, emailConnector, unsecuredActionsClient),
new ConnectorsEmailService(
PLUGIN_ID,
emailConnector,
unsecuredActionsClient,
this.logger
),
licensing.license$,
MINIMUM_LICENSE,
this.logger

View file

@ -154,6 +154,13 @@ export const ALERT_WARNING_MAX_EXECUTABLE_ACTIONS_REASON = i18n.translate(
}
);
export const ALERT_WARNING_MAX_QUEUED_ACTIONS_REASON = i18n.translate(
'xpack.triggersActionsUI.sections.rulesList.ruleWarningReasonMaxQueuedActions',
{
defaultMessage: 'Queued action limit exceeded.',
}
);
export const ALERT_WARNING_MAX_ALERTS_REASON = i18n.translate(
'xpack.triggersActionsUI.sections.rulesList.ruleWarningReasonMaxAlerts',
{
@ -182,6 +189,7 @@ export const rulesErrorReasonTranslationsMapping = {
export const rulesWarningReasonTranslationsMapping = {
maxExecutableActions: ALERT_WARNING_MAX_EXECUTABLE_ACTIONS_REASON,
maxAlerts: ALERT_WARNING_MAX_ALERTS_REASON,
maxQueuedActions: ALERT_WARNING_MAX_QUEUED_ACTIONS_REASON,
unknown: ALERT_WARNING_UNKNOWN_REASON,
};

View file

@ -343,6 +343,7 @@ export function createTestConfig(name: string, options: CreateTestConfigOptions)
'--notifications.connectors.default.email=notification-email',
'--xpack.task_manager.allow_reading_invalid_state=false',
'--xpack.task_manager.requeue_invalid_tasks.enabled=true',
'--xpack.actions.queued.max=500',
],
},
};

View file

@ -344,6 +344,7 @@ export function defineRoutes(
)
: null,
params: req.body.params,
actionTypeId: req.params.id,
},
]);
return res.noContent();

View file

@ -31,6 +31,7 @@ export default function actionsTests({ loadTestFile, getService }: FtrProviderCo
loadTestFile(require.resolve('./type_not_enabled'));
loadTestFile(require.resolve('./schedule_unsecured_action'));
loadTestFile(require.resolve('./check_registered_connector_types'));
loadTestFile(require.resolve('./max_queued_actions_circuit_breaker'));
// note that this test will destroy existing spaces
loadTestFile(require.resolve('./migrations'));

View file

@ -0,0 +1,102 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import { ES_TEST_INDEX_NAME } from '@kbn/alerting-api-integration-helpers';
import { getEventLog, getTestRuleData, ObjectRemover } from '../../../common/lib';
import { FtrProviderContext } from '../../../common/ftr_provider_context';
// eslint-disable-next-line import/no-default-export
export default function createActionTests({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
describe('max queued actions circuit breaker', () => {
const objectRemover = new ObjectRemover(supertest);
const retry = getService('retry');
after(() => objectRemover.removeAll());
it('completes execution and reports back whether it reached the limit', async () => {
const response = await supertest
.post('/api/actions/connector')
.set('kbn-xsrf', 'foo')
.send({
name: 'My action',
connector_type_id: 'test.index-record',
config: {
unencrypted: `This value shouldn't get encrypted`,
},
secrets: {
encrypted: 'This value should be encrypted',
},
});
expect(response.status).to.eql(200);
const actionId = response.body.id;
objectRemover.add('default', actionId, 'action', 'actions');
const actions = [];
for (let i = 0; i < 510; i++) {
actions.push({
id: actionId,
group: 'default',
params: {
index: ES_TEST_INDEX_NAME,
reference: 'test',
message: '',
},
frequency: {
summary: false,
throttle: null,
notify_when: 'onActiveAlert',
},
});
}
const resp = await supertest
.post('/api/alerting/rule')
.set('kbn-xsrf', 'foo')
.send(
getTestRuleData({
rule_type_id: 'test.always-firing-alert-as-data',
schedule: { interval: '1h' },
throttle: undefined,
notify_when: undefined,
params: {
index: ES_TEST_INDEX_NAME,
reference: 'test',
},
actions,
})
);
expect(resp.status).to.eql(200);
const ruleId = resp.body.id;
objectRemover.add('default', ruleId, 'rule', 'alerting');
const events = await retry.try(async () => {
return await getEventLog({
getService,
spaceId: 'default',
type: 'alert',
id: ruleId,
provider: 'alerting',
actions: new Map([['execute', { gte: 1 }]]),
});
});
// check that there's a warning in the execute event
const executeEvent = events[0];
expect(executeEvent?.event?.outcome).to.eql('success');
expect(executeEvent?.event?.reason).to.eql('maxQueuedActions');
expect(executeEvent?.kibana?.alerting?.status).to.eql('warning');
expect(executeEvent?.message).to.eql(
'The maximum number of queued actions was reached; excess actions were not triggered.'
);
});
});
}