[Response Ops][Alerting] Refactor ExecutionHandler stage 1 (#186666)

Resolves https://github.com/elastic/kibana/issues/186533

## Summary

Stage 1 of `ExecutionHandler` refactor:

* Rename `ExecutionHandler` to `ActionScheduler`.
* Create schedulers to handle the 3 different action types
(`SummaryActionScheduler`, `SystemActionScheduler`,
`PerAlertActionScheduler`)
* Splits `ExecutionHandler.generateExecutables` function into the
appropriate action type class and combine the returned executables from
each scheduler class.

GH is not recognizing the rename from `ExecutionHandler` to
`ActionScheduler` so I've called out the primary difference between the
two files (other than the rename) which is to get the executables from
each scheduler class instead of from a `generateExecutables` function.
Removed the `generateExecutables` fn from the `ActionScheduler` and any
associated private helper functions.

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2024-07-22 14:39:11 -04:00 committed by GitHub
parent 0077b0e645
commit f19af22be6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 3471 additions and 1421 deletions

View file

@ -0,0 +1,605 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { getRuleDetailsRoute, triggersActionsRoute } from '@kbn/rule-data-utils';
import { asSavedObjectExecutionSource } from '@kbn/actions-plugin/server';
import {
createTaskRunError,
isEphemeralTaskRejectedDueToCapacityError,
TaskErrorSource,
} from '@kbn/task-manager-plugin/server';
import {
ExecuteOptions as EnqueueExecutionOptions,
ExecutionResponseItem,
ExecutionResponseType,
} from '@kbn/actions-plugin/server/create_execute_function';
import { ActionsCompletion } from '@kbn/alerting-state-types';
import { chunk } from 'lodash';
import { CombinedSummarizedAlerts, ThrottledActions } from '../../types';
import { injectActionParams } from '../inject_action_params';
import { ActionSchedulerOptions, IActionScheduler, RuleUrl } from './types';
import {
transformActionParams,
TransformActionParamsOptions,
transformSummaryActionParams,
} from '../transform_action_params';
import { Alert } from '../../alert';
import {
AlertInstanceContext,
AlertInstanceState,
RuleAction,
RuleTypeParams,
RuleTypeState,
SanitizedRule,
RuleAlertData,
RuleSystemAction,
} from '../../../common';
import {
generateActionHash,
getSummaryActionsFromTaskState,
getSummaryActionTimeBounds,
isActionOnInterval,
} from './rule_action_helper';
import { RULE_SAVED_OBJECT_TYPE } from '../../saved_objects';
import { ConnectorAdapter } from '../../connector_adapters/types';
import { withAlertingSpan } from '../lib';
import * as schedulers from './schedulers';
interface LogAction {
id: string;
typeId: string;
alertId?: string;
alertGroup?: string;
alertSummary?: {
new: number;
ongoing: number;
recovered: number;
};
}
interface RunSummarizedActionArgs {
action: RuleAction;
summarizedAlerts: CombinedSummarizedAlerts;
spaceId: string;
bulkActions: EnqueueExecutionOptions[];
}
interface RunSystemActionArgs<Params extends RuleTypeParams> {
action: RuleSystemAction;
connectorAdapter: ConnectorAdapter;
summarizedAlerts: CombinedSummarizedAlerts;
rule: SanitizedRule<Params>;
ruleProducer: string;
spaceId: string;
bulkActions: EnqueueExecutionOptions[];
}
interface RunActionArgs<
State extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string
> {
action: RuleAction;
alert: Alert<State, Context, ActionGroupIds | RecoveryActionGroupId>;
ruleId: string;
spaceId: string;
bulkActions: EnqueueExecutionOptions[];
}
export interface RunResult {
throttledSummaryActions: ThrottledActions;
}
export class ActionScheduler<
Params extends RuleTypeParams,
ExtractedParams extends RuleTypeParams,
RuleState extends RuleTypeState,
State extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string,
AlertData extends RuleAlertData
> {
private readonly schedulers: Array<
IActionScheduler<State, Context, ActionGroupIds, RecoveryActionGroupId>
> = [];
private ephemeralActionsToSchedule: number;
private CHUNK_SIZE = 1000;
private ruleTypeActionGroups?: Map<ActionGroupIds | RecoveryActionGroupId, string>;
private previousStartedAt: Date | null;
constructor(
private readonly context: ActionSchedulerOptions<
Params,
ExtractedParams,
RuleState,
State,
Context,
ActionGroupIds,
RecoveryActionGroupId,
AlertData
>
) {
this.ephemeralActionsToSchedule = context.taskRunnerContext.maxEphemeralActionsPerRule;
this.ruleTypeActionGroups = new Map(
context.ruleType.actionGroups.map((actionGroup) => [actionGroup.id, actionGroup.name])
);
this.previousStartedAt = context.previousStartedAt;
for (const [_, scheduler] of Object.entries(schedulers)) {
this.schedulers.push(new scheduler(context));
}
// sort schedulers by priority
this.schedulers.sort((a, b) => a.priority - b.priority);
}
public async run(
alerts: Record<string, Alert<State, Context, ActionGroupIds | RecoveryActionGroupId>>
): Promise<RunResult> {
const throttledSummaryActions: ThrottledActions = getSummaryActionsFromTaskState({
actions: this.context.rule.actions,
summaryActions: this.context.taskInstance.state?.summaryActions,
});
const executables = [];
for (const scheduler of this.schedulers) {
executables.push(
...(await scheduler.generateExecutables({ alerts, throttledSummaryActions }))
);
}
if (executables.length === 0) {
return { throttledSummaryActions };
}
const {
CHUNK_SIZE,
context: {
logger,
alertingEventLogger,
ruleRunMetricsStore,
taskRunnerContext: { actionsConfigMap },
taskInstance: {
params: { spaceId, alertId: ruleId },
},
},
} = this;
const logActions: Record<string, LogAction> = {};
const bulkActions: EnqueueExecutionOptions[] = [];
let bulkActionsResponse: ExecutionResponseItem[] = [];
this.context.ruleRunMetricsStore.incrementNumberOfGeneratedActions(executables.length);
for (const { action, alert, summarizedAlerts } of executables) {
const { actionTypeId } = action;
ruleRunMetricsStore.incrementNumberOfGeneratedActionsByConnectorType(actionTypeId);
if (ruleRunMetricsStore.hasReachedTheExecutableActionsLimit(actionsConfigMap)) {
ruleRunMetricsStore.setTriggeredActionsStatusByConnectorType({
actionTypeId,
status: ActionsCompletion.PARTIAL,
});
logger.debug(
`Rule "${this.context.rule.id}" skipped scheduling action "${action.id}" because the maximum number of allowed actions has been reached.`
);
break;
}
if (
ruleRunMetricsStore.hasReachedTheExecutableActionsLimitByConnectorType({
actionTypeId,
actionsConfigMap,
})
) {
if (!ruleRunMetricsStore.hasConnectorTypeReachedTheLimit(actionTypeId)) {
logger.debug(
`Rule "${this.context.rule.id}" skipped scheduling action "${action.id}" because the maximum number of allowed actions for connector type ${actionTypeId} has been reached.`
);
}
ruleRunMetricsStore.setTriggeredActionsStatusByConnectorType({
actionTypeId,
status: ActionsCompletion.PARTIAL,
});
continue;
}
if (!this.isExecutableAction(action)) {
this.context.logger.warn(
`Rule "${this.context.taskInstance.params.alertId}" skipped scheduling action "${action.id}" because it is disabled`
);
continue;
}
ruleRunMetricsStore.incrementNumberOfTriggeredActions();
ruleRunMetricsStore.incrementNumberOfTriggeredActionsByConnectorType(actionTypeId);
if (!this.isSystemAction(action) && summarizedAlerts) {
const defaultAction = action as RuleAction;
if (isActionOnInterval(action)) {
throttledSummaryActions[defaultAction.uuid!] = { date: new Date().toISOString() };
}
logActions[defaultAction.id] = await this.runSummarizedAction({
action,
summarizedAlerts,
spaceId,
bulkActions,
});
} else if (summarizedAlerts && this.isSystemAction(action)) {
const hasConnectorAdapter = this.context.taskRunnerContext.connectorAdapterRegistry.has(
action.actionTypeId
);
/**
* System actions without an adapter
* cannot be executed
*
*/
if (!hasConnectorAdapter) {
this.context.logger.warn(
`Rule "${this.context.taskInstance.params.alertId}" skipped scheduling system action "${action.id}" because no connector adapter is configured`
);
continue;
}
const connectorAdapter = this.context.taskRunnerContext.connectorAdapterRegistry.get(
action.actionTypeId
);
logActions[action.id] = await this.runSystemAction({
action,
connectorAdapter,
summarizedAlerts,
rule: this.context.rule,
ruleProducer: this.context.ruleType.producer,
spaceId,
bulkActions,
});
} else if (!this.isSystemAction(action) && alert) {
const defaultAction = action as RuleAction;
logActions[defaultAction.id] = await this.runAction({
action,
spaceId,
alert,
ruleId,
bulkActions,
});
const actionGroup = defaultAction.group;
if (!this.isRecoveredAlert(actionGroup)) {
if (isActionOnInterval(action)) {
alert.updateLastScheduledActions(
defaultAction.group as ActionGroupIds,
generateActionHash(action),
defaultAction.uuid
);
} else {
alert.updateLastScheduledActions(defaultAction.group as ActionGroupIds);
}
alert.unscheduleActions();
}
}
}
if (!!bulkActions.length) {
for (const c of chunk(bulkActions, CHUNK_SIZE)) {
let enqueueResponse;
try {
enqueueResponse = await withAlertingSpan('alerting:bulk-enqueue-actions', () =>
this.context.actionsClient!.bulkEnqueueExecution(c)
);
} catch (e) {
if (e.statusCode === 404) {
throw createTaskRunError(e, TaskErrorSource.USER);
}
throw createTaskRunError(e, TaskErrorSource.FRAMEWORK);
}
if (enqueueResponse.errors) {
bulkActionsResponse = bulkActionsResponse.concat(
enqueueResponse.items.filter(
(i) => i.response === ExecutionResponseType.QUEUED_ACTIONS_LIMIT_ERROR
)
);
}
}
}
if (!!bulkActionsResponse.length) {
for (const r of bulkActionsResponse) {
if (r.response === ExecutionResponseType.QUEUED_ACTIONS_LIMIT_ERROR) {
ruleRunMetricsStore.setHasReachedQueuedActionsLimit(true);
ruleRunMetricsStore.decrementNumberOfTriggeredActions();
ruleRunMetricsStore.decrementNumberOfTriggeredActionsByConnectorType(r.actionTypeId);
ruleRunMetricsStore.setTriggeredActionsStatusByConnectorType({
actionTypeId: r.actionTypeId,
status: ActionsCompletion.PARTIAL,
});
logger.debug(
`Rule "${this.context.rule.id}" skipped scheduling action "${r.id}" because the maximum number of queued actions has been reached.`
);
delete logActions[r.id];
}
}
}
const logActionsValues = Object.values(logActions);
if (!!logActionsValues.length) {
for (const action of logActionsValues) {
alertingEventLogger.logAction(action);
}
}
return { throttledSummaryActions };
}
private async runSummarizedAction({
action,
summarizedAlerts,
spaceId,
bulkActions,
}: RunSummarizedActionArgs): Promise<LogAction> {
const { start, end } = getSummaryActionTimeBounds(
action,
this.context.rule.schedule,
this.previousStartedAt
);
const ruleUrl = this.buildRuleUrl(spaceId, start, end);
const actionToRun = {
...action,
params: injectActionParams({
actionTypeId: action.actionTypeId,
ruleUrl,
ruleName: this.context.rule.name,
actionParams: transformSummaryActionParams({
alerts: summarizedAlerts,
rule: this.context.rule,
ruleTypeId: this.context.ruleType.id,
actionId: action.id,
actionParams: action.params,
spaceId,
actionsPlugin: this.context.taskRunnerContext.actionsPlugin,
actionTypeId: action.actionTypeId,
kibanaBaseUrl: this.context.taskRunnerContext.kibanaBaseUrl,
ruleUrl: ruleUrl?.absoluteUrl,
}),
}),
};
await this.actionRunOrAddToBulk({
enqueueOptions: this.getEnqueueOptions(actionToRun),
bulkActions,
});
return {
id: action.id,
typeId: action.actionTypeId,
alertSummary: {
new: summarizedAlerts.new.count,
ongoing: summarizedAlerts.ongoing.count,
recovered: summarizedAlerts.recovered.count,
},
};
}
private async runSystemAction({
action,
spaceId,
connectorAdapter,
summarizedAlerts,
rule,
ruleProducer,
bulkActions,
}: RunSystemActionArgs<Params>): Promise<LogAction> {
const ruleUrl = this.buildRuleUrl(spaceId);
const connectorAdapterActionParams = connectorAdapter.buildActionParams({
alerts: summarizedAlerts,
rule: {
id: rule.id,
tags: rule.tags,
name: rule.name,
consumer: rule.consumer,
producer: ruleProducer,
},
ruleUrl: ruleUrl?.absoluteUrl,
spaceId,
params: action.params,
});
const actionToRun = Object.assign(action, { params: connectorAdapterActionParams });
await this.actionRunOrAddToBulk({
enqueueOptions: this.getEnqueueOptions(actionToRun),
bulkActions,
});
return {
id: action.id,
typeId: action.actionTypeId,
alertSummary: {
new: summarizedAlerts.new.count,
ongoing: summarizedAlerts.ongoing.count,
recovered: summarizedAlerts.recovered.count,
},
};
}
private async runAction({
action,
spaceId,
alert,
ruleId,
bulkActions,
}: RunActionArgs<State, Context, ActionGroupIds, RecoveryActionGroupId>): Promise<LogAction> {
const ruleUrl = this.buildRuleUrl(spaceId);
const executableAlert = alert!;
const actionGroup = action.group as ActionGroupIds;
const transformActionParamsOptions: TransformActionParamsOptions = {
actionsPlugin: this.context.taskRunnerContext.actionsPlugin,
alertId: ruleId,
alertType: this.context.ruleType.id,
actionTypeId: action.actionTypeId,
alertName: this.context.rule.name,
spaceId,
tags: this.context.rule.tags,
alertInstanceId: executableAlert.getId(),
alertUuid: executableAlert.getUuid(),
alertActionGroup: actionGroup,
alertActionGroupName: this.ruleTypeActionGroups!.get(actionGroup)!,
context: executableAlert.getContext(),
actionId: action.id,
state: executableAlert.getState(),
kibanaBaseUrl: this.context.taskRunnerContext.kibanaBaseUrl,
alertParams: this.context.rule.params,
actionParams: action.params,
flapping: executableAlert.getFlapping(),
ruleUrl: ruleUrl?.absoluteUrl,
};
if (executableAlert.isAlertAsData()) {
transformActionParamsOptions.aadAlert = executableAlert.getAlertAsData();
}
const actionToRun = {
...action,
params: injectActionParams({
actionTypeId: action.actionTypeId,
ruleUrl,
ruleName: this.context.rule.name,
actionParams: transformActionParams(transformActionParamsOptions),
}),
};
await this.actionRunOrAddToBulk({
enqueueOptions: this.getEnqueueOptions(actionToRun),
bulkActions,
});
return {
id: action.id,
typeId: action.actionTypeId,
alertId: alert.getId(),
alertGroup: action.group,
};
}
private isExecutableAction(action: RuleAction | RuleSystemAction) {
return this.context.taskRunnerContext.actionsPlugin.isActionExecutable(
action.id,
action.actionTypeId,
{
notifyUsage: true,
}
);
}
private isSystemAction(action?: RuleAction | RuleSystemAction): action is RuleSystemAction {
return this.context.taskRunnerContext.actionsPlugin.isSystemActionConnector(action?.id ?? '');
}
private isRecoveredAlert(actionGroup: string) {
return actionGroup === this.context.ruleType.recoveryActionGroup.id;
}
private buildRuleUrl(spaceId: string, start?: number, end?: number): RuleUrl | undefined {
if (!this.context.taskRunnerContext.kibanaBaseUrl) {
return;
}
const relativePath = this.context.ruleType.getViewInAppRelativeUrl
? this.context.ruleType.getViewInAppRelativeUrl({ rule: this.context.rule, start, end })
: `${triggersActionsRoute}${getRuleDetailsRoute(this.context.rule.id)}`;
try {
const basePathname = new URL(this.context.taskRunnerContext.kibanaBaseUrl).pathname;
const basePathnamePrefix = basePathname !== '/' ? `${basePathname}` : '';
const spaceIdSegment = spaceId !== 'default' ? `/s/${spaceId}` : '';
const ruleUrl = new URL(
[basePathnamePrefix, spaceIdSegment, relativePath].join(''),
this.context.taskRunnerContext.kibanaBaseUrl
);
return {
absoluteUrl: ruleUrl.toString(),
kibanaBaseUrl: this.context.taskRunnerContext.kibanaBaseUrl,
basePathname: basePathnamePrefix,
spaceIdSegment,
relativePath,
};
} catch (error) {
this.context.logger.debug(
`Rule "${this.context.rule.id}" encountered an error while constructing the rule.url variable: ${error.message}`
);
return;
}
}
private getEnqueueOptions(action: RuleAction | RuleSystemAction): EnqueueExecutionOptions {
const {
context: {
apiKey,
ruleConsumer,
executionId,
taskInstance: {
params: { spaceId, alertId: ruleId },
},
},
} = this;
const namespace = spaceId === 'default' ? {} : { namespace: spaceId };
return {
id: action.id,
params: action.params,
spaceId,
apiKey: apiKey ?? null,
consumer: ruleConsumer,
source: asSavedObjectExecutionSource({
id: ruleId,
type: RULE_SAVED_OBJECT_TYPE,
}),
executionId,
relatedSavedObjects: [
{
id: ruleId,
type: RULE_SAVED_OBJECT_TYPE,
namespace: namespace.namespace,
typeId: this.context.ruleType.id,
},
],
actionTypeId: action.actionTypeId,
};
}
private async actionRunOrAddToBulk({
enqueueOptions,
bulkActions,
}: {
enqueueOptions: EnqueueExecutionOptions;
bulkActions: EnqueueExecutionOptions[];
}) {
if (
this.context.taskRunnerContext.supportsEphemeralTasks &&
this.ephemeralActionsToSchedule > 0
) {
this.ephemeralActionsToSchedule--;
try {
await this.context.actionsClient!.ephemeralEnqueuedExecution(enqueueOptions);
} catch (err) {
if (isEphemeralTaskRejectedDueToCapacityError(err)) {
bulkActions.push(enqueueOptions);
}
}
} else {
bulkActions.push(enqueueOptions);
}
}
}

View file

@ -0,0 +1,127 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { getSummarizedAlerts } from './get_summarized_alerts';
import { alertsClientMock } from '../../alerts_client/alerts_client.mock';
import { mockAAD } from '../fixtures';
import { ALERT_UUID } from '@kbn/rule-data-utils';
import { generateAlert } from './test_fixtures';
import { getErrorSource } from '@kbn/task-manager-plugin/server/task_running';
const alertsClient = alertsClientMock.create();
describe('getSummarizedAlerts', () => {
const newAlert1 = generateAlert({ id: 1 });
const newAlert2 = generateAlert({ id: 2 });
const alerts = { ...newAlert1, ...newAlert2 };
beforeEach(() => {
jest.resetAllMocks();
});
test('should call alertsClient.getSummarizedAlerts with the correct params', async () => {
const summarizedAlerts = {
new: {
count: 2,
data: [
{ ...mockAAD, [ALERT_UUID]: alerts[1].getUuid() },
{ ...mockAAD, [ALERT_UUID]: alerts[2].getUuid() },
],
},
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
alertsClient.getSummarizedAlerts.mockResolvedValue(summarizedAlerts);
alertsClient.getProcessedAlerts.mockReturnValue(alerts);
const result = await getSummarizedAlerts({
alertsClient,
queryOptions: {
excludedAlertInstanceIds: [],
executionUuid: '123xyz',
ruleId: '1',
spaceId: 'test1',
},
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledWith({
excludedAlertInstanceIds: [],
executionUuid: '123xyz',
ruleId: '1',
spaceId: 'test1',
});
expect(result).toEqual({
...summarizedAlerts,
all: summarizedAlerts.new,
});
});
test('should throw error if alertsClient.getSummarizedAlerts throws error', async () => {
alertsClient.getSummarizedAlerts.mockImplementation(() => {
throw new Error('cannot get summarized alerts');
});
try {
await getSummarizedAlerts({
alertsClient,
queryOptions: {
excludedAlertInstanceIds: [],
executionUuid: '123xyz',
ruleId: '1',
spaceId: 'test1',
},
});
} catch (err) {
expect(getErrorSource(err)).toBe('framework');
expect(err.message).toBe('cannot get summarized alerts');
}
});
test('should remove alert from summarized alerts if it is new and has a maintenance window', async () => {
const newAlertWithMaintenanceWindow = generateAlert({
id: 1,
maintenanceWindowIds: ['mw-1'],
});
const alertsWithMaintenanceWindow = { ...newAlertWithMaintenanceWindow, ...newAlert2 };
const newAADAlerts = [
{ ...mockAAD, [ALERT_UUID]: newAlertWithMaintenanceWindow[1].getUuid() },
{ ...mockAAD, [ALERT_UUID]: alerts[2].getUuid() },
];
const summarizedAlerts = {
new: { count: 2, data: newAADAlerts },
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
alertsClient.getSummarizedAlerts.mockResolvedValue(summarizedAlerts);
alertsClient.getProcessedAlerts.mockReturnValue(alertsWithMaintenanceWindow);
const result = await getSummarizedAlerts({
alertsClient,
queryOptions: {
excludedAlertInstanceIds: [],
executionUuid: '123xyz',
ruleId: '1',
spaceId: 'test1',
},
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledWith({
excludedAlertInstanceIds: [],
executionUuid: '123xyz',
ruleId: '1',
spaceId: 'test1',
});
expect(result).toEqual({
new: { count: 1, data: [newAADAlerts[1]] },
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
all: { count: 1, data: [newAADAlerts[1]] },
});
});
});

View file

@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ALERT_UUID } from '@kbn/rule-data-utils';
import { createTaskRunError, TaskErrorSource } from '@kbn/task-manager-plugin/server';
import { GetSummarizedAlertsParams, IAlertsClient } from '../../alerts_client/types';
import {
AlertInstanceContext,
AlertInstanceState,
CombinedSummarizedAlerts,
RuleAlertData,
} from '../../types';
interface GetSummarizedAlertsOpts<
State extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string,
AlertData extends RuleAlertData
> {
alertsClient: IAlertsClient<AlertData, State, Context, ActionGroupIds, RecoveryActionGroupId>;
queryOptions: GetSummarizedAlertsParams;
}
export const getSummarizedAlerts = async <
State extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string,
AlertData extends RuleAlertData
>({
alertsClient,
queryOptions,
}: GetSummarizedAlertsOpts<
State,
Context,
ActionGroupIds,
RecoveryActionGroupId,
AlertData
>): Promise<CombinedSummarizedAlerts> => {
let alerts;
try {
alerts = await alertsClient.getSummarizedAlerts!(queryOptions);
} catch (e) {
throw createTaskRunError(e, TaskErrorSource.FRAMEWORK);
}
/**
* We need to remove all new alerts with maintenance windows retrieved from
* getSummarizedAlerts because they might not have maintenance window IDs
* associated with them from maintenance windows with scoped query updated
* yet (the update call uses refresh: false). So we need to rely on the in
* memory alerts to do this.
*/
const newAlertsInMemory = Object.values(alertsClient.getProcessedAlerts('new') || {}) || [];
const newAlertsWithMaintenanceWindowIds = newAlertsInMemory.reduce<string[]>((result, alert) => {
if (alert.getMaintenanceWindowIds().length > 0) {
result.push(alert.getUuid());
}
return result;
}, []);
const newAlerts = alerts.new.data.filter((alert) => {
return !newAlertsWithMaintenanceWindowIds.includes(alert[ALERT_UUID]);
});
const total = newAlerts.length + alerts.ongoing.count + alerts.recovered.count;
return {
...alerts,
new: { count: newAlerts.length, data: newAlerts },
all: { count: total, data: [...newAlerts, ...alerts.ongoing.data, ...alerts.recovered.data] },
};
};

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { ActionScheduler } from './action_scheduler';
export type { RunResult } from './action_scheduler';
export type { RuleUrl } from './types';

View file

@ -6,15 +6,16 @@
*/
import { Logger } from '@kbn/logging';
import { RuleAction } from '../types';
import { loggingSystemMock } from '@kbn/core/server/mocks';
import { RuleAction } from '../../types';
import {
generateActionHash,
getSummaryActionsFromTaskState,
isActionOnInterval,
isSummaryAction,
isSummaryActionOnInterval,
isSummaryActionThrottled,
getSummaryActionTimeBounds,
logNumberOfFilteredAlerts,
} from './rule_action_helper';
const now = '2021-05-13T12:33:37.000Z';
@ -291,30 +292,6 @@ describe('rule_action_helper', () => {
});
});
describe('isSummaryActionOnInterval', () => {
test('returns true for a summary action on interval', () => {
expect(isSummaryActionOnInterval(mockSummaryAction)).toBe(true);
});
test('returns false for a non-summary ', () => {
expect(
isSummaryActionOnInterval({
...mockAction,
frequency: { summary: false, notifyWhen: 'onThrottleInterval', throttle: '1h' },
})
).toBe(false);
});
test('returns false for a summary per rule run ', () => {
expect(
isSummaryActionOnInterval({
...mockAction,
frequency: { summary: true, notifyWhen: 'onActiveAlert', throttle: null },
})
).toBe(false);
});
});
describe('getSummaryActionTimeBounds', () => {
test('returns undefined start and end action is not summary action', () => {
expect(getSummaryActionTimeBounds(mockAction, { interval: '1m' }, null)).toEqual({
@ -370,4 +347,30 @@ describe('rule_action_helper', () => {
expect(start).toEqual(new Date('2021-05-13T12:32:37.000Z').valueOf());
});
});
describe('logNumberOfFilteredAlerts', () => {
test('should log when the number of alerts is different than the number of summarized alerts', () => {
const logger = loggingSystemMock.create().get();
logNumberOfFilteredAlerts({
logger,
numberOfAlerts: 10,
numberOfSummarizedAlerts: 5,
action: mockSummaryAction,
});
expect(logger.debug).toHaveBeenCalledWith(
'(5) alerts have been filtered out for: slack:111-111'
);
});
test('should not log when the number of alerts is the same as the number of summarized alerts', () => {
const logger = loggingSystemMock.create().get();
logNumberOfFilteredAlerts({
logger,
numberOfAlerts: 10,
numberOfSummarizedAlerts: 10,
action: mockSummaryAction,
});
expect(logger.debug).not.toHaveBeenCalled();
});
});
});

View file

@ -12,7 +12,7 @@ import {
RuleAction,
RuleNotifyWhenTypeValues,
ThrottledActions,
} from '../../common';
} from '../../../common';
export const isSummaryAction = (action?: RuleAction) => {
return action?.frequency?.summary ?? false;
@ -28,10 +28,6 @@ export const isActionOnInterval = (action?: RuleAction) => {
);
};
export const isSummaryActionOnInterval = (action: RuleAction) => {
return isActionOnInterval(action) && action.frequency?.summary;
};
export const isSummaryActionThrottled = ({
action,
throttledSummaryActions,
@ -129,3 +125,25 @@ export const getSummaryActionTimeBounds = (
return { start: startDate.valueOf(), end: now.valueOf() };
};
interface LogNumberOfFilteredAlertsOpts {
logger: Logger;
numberOfAlerts: number;
numberOfSummarizedAlerts: number;
action: RuleAction;
}
export const logNumberOfFilteredAlerts = ({
logger,
numberOfAlerts = 0,
numberOfSummarizedAlerts = 0,
action,
}: LogNumberOfFilteredAlertsOpts) => {
const count = numberOfAlerts - numberOfSummarizedAlerts;
if (count > 0) {
logger.debug(
`(${count}) alert${count > 1 ? 's' : ''} ${
count > 1 ? 'have' : 'has'
} been filtered out for: ${action.actionTypeId}:${action.uuid}`
);
}
};

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { SystemActionScheduler } from './system_action_scheduler';
export { SummaryActionScheduler } from './summary_action_scheduler';
export { PerAlertActionScheduler } from './per_alert_action_scheduler';

View file

@ -0,0 +1,849 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import sinon from 'sinon';
import { actionsClientMock, actionsMock } from '@kbn/actions-plugin/server/mocks';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import { alertsClientMock } from '../../../alerts_client/alerts_client.mock';
import { alertingEventLoggerMock } from '../../../lib/alerting_event_logger/alerting_event_logger.mock';
import { RuleRunMetricsStore } from '../../../lib/rule_run_metrics_store';
import { mockAAD } from '../../fixtures';
import { PerAlertActionScheduler } from './per_alert_action_scheduler';
import { getRule, getRuleType, getDefaultSchedulerContext, generateAlert } from '../test_fixtures';
import { SanitizedRuleAction } from '@kbn/alerting-types';
import { ALERT_UUID } from '@kbn/rule-data-utils';
const alertingEventLogger = alertingEventLoggerMock.create();
const actionsClient = actionsClientMock.create();
const alertsClient = alertsClientMock.create();
const mockActionsPlugin = actionsMock.createStart();
const logger = loggingSystemMock.create().get();
let ruleRunMetricsStore: RuleRunMetricsStore;
const rule = getRule({
actions: [
{
id: '1',
group: 'default',
actionTypeId: 'test',
frequency: { summary: false, notifyWhen: 'onActiveAlert', throttle: null },
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '111-111',
},
{
id: '2',
group: 'default',
actionTypeId: 'test',
frequency: { summary: false, notifyWhen: 'onActiveAlert', throttle: null },
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '222-222',
},
{
id: '3',
group: 'default',
actionTypeId: 'test',
frequency: { summary: true, notifyWhen: 'onActiveAlert' },
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '333-333',
},
],
});
const ruleType = getRuleType();
const defaultSchedulerContext = getDefaultSchedulerContext(
logger,
mockActionsPlugin,
alertingEventLogger,
actionsClient,
alertsClient
);
// @ts-ignore
const getSchedulerContext = (params = {}) => {
return { ...defaultSchedulerContext, rule, ...params, ruleRunMetricsStore };
};
let clock: sinon.SinonFakeTimers;
describe('Per-Alert Action Scheduler', () => {
beforeAll(() => {
clock = sinon.useFakeTimers();
});
beforeEach(() => {
jest.resetAllMocks();
mockActionsPlugin.isActionTypeEnabled.mockReturnValue(true);
mockActionsPlugin.isActionExecutable.mockReturnValue(true);
mockActionsPlugin.getActionsClientWithRequest.mockResolvedValue(actionsClient);
ruleRunMetricsStore = new RuleRunMetricsStore();
});
afterAll(() => {
clock.restore();
});
test('should initialize with only per-alert actions', () => {
const scheduler = new PerAlertActionScheduler(getSchedulerContext());
// @ts-expect-error private variable
expect(scheduler.actions).toHaveLength(2);
// @ts-expect-error private variable
expect(scheduler.actions).toEqual([rule.actions[0], rule.actions[1]]);
expect(logger.error).not.toHaveBeenCalled();
});
test('should not initialize action and log if rule type does not support summarized alerts and action has alertsFilter', () => {
const actions = [
{
id: '1',
group: 'default',
actionTypeId: 'test',
frequency: { summary: false, notifyWhen: 'onActiveAlert', throttle: null },
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '111-111',
},
{
id: '2',
group: 'default',
actionTypeId: 'test',
frequency: { summary: false, notifyWhen: 'onActiveAlert', throttle: null },
alertsFilter: {
query: { kql: 'kibana.alert.rule.name:foo', dsl: '{}', filters: [] },
},
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '222-222',
},
];
const scheduler = new PerAlertActionScheduler(
getSchedulerContext({
rule: {
...rule,
actions,
},
ruleType: { ...ruleType, alerts: undefined },
})
);
// @ts-expect-error private variable
expect(scheduler.actions).toHaveLength(1);
// @ts-expect-error private variable
expect(scheduler.actions).toEqual([actions[0]]);
expect(logger.error).toHaveBeenCalledTimes(1);
expect(logger.error).toHaveBeenCalledWith(
`Skipping action \"2\" for rule \"1\" because the rule type \"Test\" does not support alert-as-data.`
);
});
describe('generateExecutables', () => {
const newAlert1 = generateAlert({ id: 1 });
const newAlert2 = generateAlert({ id: 2 });
const alerts = { ...newAlert1, ...newAlert2 };
test('should generate executable for each alert and each action', async () => {
const scheduler = new PerAlertActionScheduler(getSchedulerContext());
const executables = await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).not.toHaveBeenCalled();
expect(logger.debug).not.toHaveBeenCalled();
expect(executables).toHaveLength(4);
expect(executables).toEqual([
{ action: rule.actions[0], alert: alerts['1'] },
{ action: rule.actions[0], alert: alerts['2'] },
{ action: rule.actions[1], alert: alerts['1'] },
{ action: rule.actions[1], alert: alerts['2'] },
]);
});
test('should skip generating executable when alert has maintenance window', async () => {
const scheduler = new PerAlertActionScheduler(getSchedulerContext());
const newAlertWithMaintenanceWindow = generateAlert({
id: 1,
maintenanceWindowIds: ['mw-1'],
});
const alertsWithMaintenanceWindow = { ...newAlertWithMaintenanceWindow, ...newAlert2 };
const executables = await scheduler.generateExecutables({
alerts: alertsWithMaintenanceWindow,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).not.toHaveBeenCalled();
expect(logger.debug).toHaveBeenCalledTimes(2);
expect(logger.debug).toHaveBeenNthCalledWith(
1,
`no scheduling of summary actions \"1\" for rule \"1\": has active maintenance windows mw-1.`
);
expect(logger.debug).toHaveBeenNthCalledWith(
2,
`no scheduling of summary actions \"2\" for rule \"1\": has active maintenance windows mw-1.`
);
expect(executables).toHaveLength(2);
expect(executables).toEqual([
{ action: rule.actions[0], alert: alerts['2'] },
{ action: rule.actions[1], alert: alerts['2'] },
]);
});
test('should skip generating executable when alert has invalid action group', async () => {
const scheduler = new PerAlertActionScheduler(getSchedulerContext());
const newAlertInvalidActionGroup = generateAlert({
id: 1,
// @ts-expect-error
group: 'invalid',
});
const alertsWithInvalidActionGroup = { ...newAlertInvalidActionGroup, ...newAlert2 };
const executables = await scheduler.generateExecutables({
alerts: alertsWithInvalidActionGroup,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).not.toHaveBeenCalled();
expect(logger.error).toHaveBeenCalledTimes(2);
expect(logger.error).toHaveBeenNthCalledWith(
1,
`Invalid action group \"invalid\" for rule \"test\".`
);
expect(logger.error).toHaveBeenNthCalledWith(
2,
`Invalid action group \"invalid\" for rule \"test\".`
);
expect(executables).toHaveLength(2);
expect(executables).toEqual([
{ action: rule.actions[0], alert: alerts['2'] },
{ action: rule.actions[1], alert: alerts['2'] },
]);
});
test('should skip generating executable when alert has pending recovered count greater than 0 and notifyWhen is onActiveAlert', async () => {
const scheduler = new PerAlertActionScheduler(getSchedulerContext());
const newAlertWithPendingRecoveredCount = generateAlert({
id: 1,
pendingRecoveredCount: 3,
});
const alertsWithPendingRecoveredCount = {
...newAlertWithPendingRecoveredCount,
...newAlert2,
};
const executables = await scheduler.generateExecutables({
alerts: alertsWithPendingRecoveredCount,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).not.toHaveBeenCalled();
expect(executables).toHaveLength(2);
expect(executables).toEqual([
{ action: rule.actions[0], alert: alerts['2'] },
{ action: rule.actions[1], alert: alerts['2'] },
]);
});
test('should skip generating executable when alert has pending recovered count greater than 0 and notifyWhen is onThrottleInterval', async () => {
const onThrottleIntervalAction: SanitizedRuleAction = {
id: '2',
group: 'default',
actionTypeId: 'test',
frequency: { summary: false, notifyWhen: 'onThrottleInterval', throttle: '1h' },
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '222-222',
};
const scheduler = new PerAlertActionScheduler({
...getSchedulerContext(),
rule: { ...rule, actions: [rule.actions[0], onThrottleIntervalAction] },
});
const newAlertWithPendingRecoveredCount = generateAlert({
id: 1,
pendingRecoveredCount: 3,
});
const alertsWithPendingRecoveredCount = {
...newAlertWithPendingRecoveredCount,
...newAlert2,
};
const executables = await scheduler.generateExecutables({
alerts: alertsWithPendingRecoveredCount,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).not.toHaveBeenCalled();
expect(executables).toHaveLength(2);
expect(executables).toEqual([
{ action: rule.actions[0], alert: alerts['2'] },
{ action: onThrottleIntervalAction, alert: alerts['2'] },
]);
});
test('should skip generating executable when alert is muted', async () => {
const scheduler = new PerAlertActionScheduler({
...getSchedulerContext(),
rule: { ...rule, mutedInstanceIds: ['2'] },
});
const executables = await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).not.toHaveBeenCalled();
expect(logger.debug).toHaveBeenCalledTimes(1);
expect(logger.debug).toHaveBeenNthCalledWith(
1,
`skipping scheduling of actions for '2' in rule rule-label: rule is muted`
);
expect(executables).toHaveLength(2);
// @ts-expect-error private variable
expect(scheduler.skippedAlerts).toEqual({ '2': { reason: 'muted' } });
expect(executables).toEqual([
{ action: rule.actions[0], alert: alerts['1'] },
{ action: rule.actions[1], alert: alerts['1'] },
]);
});
test('should skip generating executable when alert action group has not changed and notifyWhen is onActionGroupChange', async () => {
const onActionGroupChangeAction: SanitizedRuleAction = {
id: '2',
group: 'default',
actionTypeId: 'test',
frequency: { summary: false, notifyWhen: 'onActionGroupChange', throttle: null },
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '222-222',
};
const activeAlert1 = generateAlert({
id: 1,
group: 'default',
lastScheduledActionsGroup: 'other-group',
});
const activeAlert2 = generateAlert({
id: 2,
group: 'default',
lastScheduledActionsGroup: 'default',
});
const alertsWithOngoingAlert = { ...activeAlert1, ...activeAlert2 };
const scheduler = new PerAlertActionScheduler({
...getSchedulerContext(),
rule: { ...rule, actions: [rule.actions[0], onActionGroupChangeAction] },
});
const executables = await scheduler.generateExecutables({
alerts: alertsWithOngoingAlert,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).not.toHaveBeenCalled();
expect(logger.debug).toHaveBeenCalledTimes(1);
expect(logger.debug).toHaveBeenNthCalledWith(
1,
`skipping scheduling of actions for '2' in rule rule-label: alert is active but action group has not changed`
);
expect(executables).toHaveLength(3);
// @ts-expect-error private variable
expect(scheduler.skippedAlerts).toEqual({ '2': { reason: 'actionGroupHasNotChanged' } });
expect(executables).toEqual([
{ action: rule.actions[0], alert: alertsWithOngoingAlert['1'] },
{ action: rule.actions[0], alert: alertsWithOngoingAlert['2'] },
{ action: onActionGroupChangeAction, alert: alertsWithOngoingAlert['1'] },
]);
});
test('should skip generating executable when throttle interval has not passed and notifyWhen is onThrottleInterval', async () => {
const onThrottleIntervalAction: SanitizedRuleAction = {
id: '2',
group: 'default',
actionTypeId: 'test',
frequency: { summary: false, notifyWhen: 'onThrottleInterval', throttle: '1h' },
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '222-222',
};
const activeAlert2 = generateAlert({
id: 2,
lastScheduledActionsGroup: 'default',
throttledActions: { '222-222': { date: '1969-12-31T23:10:00.000Z' } },
});
const alertsWithOngoingAlert = { ...newAlert1, ...activeAlert2 };
const scheduler = new PerAlertActionScheduler({
...getSchedulerContext(),
rule: { ...rule, actions: [rule.actions[0], onThrottleIntervalAction] },
});
const executables = await scheduler.generateExecutables({
alerts: alertsWithOngoingAlert,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).not.toHaveBeenCalled();
expect(logger.debug).toHaveBeenCalledTimes(1);
expect(logger.debug).toHaveBeenNthCalledWith(
1,
`skipping scheduling of actions for '2' in rule rule-label: rule is throttled`
);
expect(executables).toHaveLength(3);
// @ts-expect-error private variable
expect(scheduler.skippedAlerts).toEqual({ '2': { reason: 'throttled' } });
expect(executables).toEqual([
{ action: rule.actions[0], alert: alertsWithOngoingAlert['1'] },
{ action: rule.actions[0], alert: alertsWithOngoingAlert['2'] },
{ action: onThrottleIntervalAction, alert: alertsWithOngoingAlert['1'] },
]);
});
test('should not skip generating executable when throttle interval has passed and notifyWhen is onThrottleInterval', async () => {
const onThrottleIntervalAction: SanitizedRuleAction = {
id: '2',
group: 'default',
actionTypeId: 'test',
frequency: { summary: false, notifyWhen: 'onThrottleInterval', throttle: '1h' },
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '222-222',
};
const activeAlert2 = generateAlert({
id: 2,
lastScheduledActionsGroup: 'default',
throttledActions: { '222-222': { date: '1969-12-31T22:10:00.000Z' } },
});
const alertsWithOngoingAlert = { ...newAlert1, ...activeAlert2 };
const scheduler = new PerAlertActionScheduler({
...getSchedulerContext(),
rule: { ...rule, actions: [rule.actions[0], onThrottleIntervalAction] },
});
const executables = await scheduler.generateExecutables({
alerts: alertsWithOngoingAlert,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).not.toHaveBeenCalled();
expect(logger.debug).not.toHaveBeenCalled();
expect(executables).toHaveLength(4);
// @ts-expect-error private variable
expect(scheduler.skippedAlerts).toEqual({});
expect(executables).toEqual([
{ action: rule.actions[0], alert: alertsWithOngoingAlert['1'] },
{ action: rule.actions[0], alert: alertsWithOngoingAlert['2'] },
{ action: onThrottleIntervalAction, alert: alertsWithOngoingAlert['1'] },
{ action: onThrottleIntervalAction, alert: alertsWithOngoingAlert['2'] },
]);
});
test('should query for summarized alerts if useAlertDataForTemplate is true', async () => {
alertsClient.getProcessedAlerts.mockReturnValue(alerts);
const summarizedAlerts = {
new: {
count: 1,
data: [
{ ...mockAAD, [ALERT_UUID]: alerts[1].getUuid() },
{ ...mockAAD, [ALERT_UUID]: alerts[2].getUuid() },
],
},
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
alertsClient.getSummarizedAlerts.mockResolvedValue(summarizedAlerts);
const actionWithUseAlertDataForTemplate: SanitizedRuleAction = {
id: '1',
group: 'default',
actionTypeId: 'test',
frequency: { summary: false, notifyWhen: 'onActiveAlert', throttle: null },
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '111-111',
useAlertDataForTemplate: true,
};
const scheduler = new PerAlertActionScheduler({
...getSchedulerContext(),
rule: { ...rule, actions: [rule.actions[0], actionWithUseAlertDataForTemplate] },
});
const executables = await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledTimes(1);
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledWith({
excludedAlertInstanceIds: [],
executionUuid: defaultSchedulerContext.executionId,
ruleId: '1',
spaceId: 'test1',
});
expect(executables).toHaveLength(4);
expect(executables).toEqual([
{ action: rule.actions[0], alert: alerts['1'] },
{ action: rule.actions[0], alert: alerts['2'] },
{ action: actionWithUseAlertDataForTemplate, alert: alerts['1'] },
{ action: actionWithUseAlertDataForTemplate, alert: alerts['2'] },
]);
});
test('should query for summarized alerts if useAlertDataForTemplate is true and action has throttle interval', async () => {
alertsClient.getProcessedAlerts.mockReturnValue(alerts);
const summarizedAlerts = {
new: {
count: 1,
data: [
{ ...mockAAD, [ALERT_UUID]: alerts[1].getUuid() },
{ ...mockAAD, [ALERT_UUID]: alerts[2].getUuid() },
],
},
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
alertsClient.getSummarizedAlerts.mockResolvedValue(summarizedAlerts);
const actionWithUseAlertDataForTemplate: SanitizedRuleAction = {
id: '1',
group: 'default',
actionTypeId: 'test',
frequency: { summary: false, notifyWhen: 'onThrottleInterval', throttle: '1h' },
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '111-111',
useAlertDataForTemplate: true,
};
const scheduler = new PerAlertActionScheduler({
...getSchedulerContext(),
rule: { ...rule, actions: [rule.actions[0], actionWithUseAlertDataForTemplate] },
});
const executables = await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledTimes(1);
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledWith({
excludedAlertInstanceIds: [],
ruleId: '1',
spaceId: 'test1',
start: new Date('1969-12-31T23:00:00.000Z'),
end: new Date('1970-01-01T00:00:00.000Z'),
});
expect(executables).toHaveLength(4);
expect(executables).toEqual([
{ action: rule.actions[0], alert: alerts['1'] },
{ action: rule.actions[0], alert: alerts['2'] },
{ action: actionWithUseAlertDataForTemplate, alert: alerts['1'] },
{ action: actionWithUseAlertDataForTemplate, alert: alerts['2'] },
]);
});
test('should query for summarized alerts if action has alertsFilter', async () => {
alertsClient.getProcessedAlerts.mockReturnValue(alerts);
const summarizedAlerts = {
new: {
count: 1,
data: [
{ ...mockAAD, [ALERT_UUID]: alerts[1].getUuid() },
{ ...mockAAD, [ALERT_UUID]: alerts[2].getUuid() },
],
},
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
alertsClient.getSummarizedAlerts.mockResolvedValue(summarizedAlerts);
const actionWithAlertsFilter: SanitizedRuleAction = {
id: '1',
group: 'default',
actionTypeId: 'test',
frequency: { summary: false, notifyWhen: 'onActiveAlert', throttle: null },
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '111-111',
alertsFilter: { query: { kql: 'kibana.alert.rule.name:foo', filters: [] } },
};
const scheduler = new PerAlertActionScheduler({
...getSchedulerContext(),
rule: { ...rule, actions: [rule.actions[0], actionWithAlertsFilter] },
});
const executables = await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledTimes(1);
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledWith({
excludedAlertInstanceIds: [],
executionUuid: defaultSchedulerContext.executionId,
ruleId: '1',
spaceId: 'test1',
alertsFilter: { query: { kql: 'kibana.alert.rule.name:foo', filters: [] } },
});
expect(executables).toHaveLength(4);
expect(executables).toEqual([
{ action: rule.actions[0], alert: alerts['1'] },
{ action: rule.actions[0], alert: alerts['2'] },
{ action: actionWithAlertsFilter, alert: alerts['1'] },
{ action: actionWithAlertsFilter, alert: alerts['2'] },
]);
});
test('should query for summarized alerts if action has alertsFilter and action has throttle interval', async () => {
alertsClient.getProcessedAlerts.mockReturnValue(alerts);
const summarizedAlerts = {
new: {
count: 1,
data: [
{ ...mockAAD, [ALERT_UUID]: alerts[1].getUuid() },
{ ...mockAAD, [ALERT_UUID]: alerts[2].getUuid() },
],
},
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
alertsClient.getSummarizedAlerts.mockResolvedValue(summarizedAlerts);
const actionWithAlertsFilter: SanitizedRuleAction = {
id: '1',
group: 'default',
actionTypeId: 'test',
frequency: { summary: false, notifyWhen: 'onThrottleInterval', throttle: '6h' },
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '111-111',
alertsFilter: { query: { kql: 'kibana.alert.rule.name:foo', filters: [] } },
};
const scheduler = new PerAlertActionScheduler({
...getSchedulerContext(),
rule: { ...rule, actions: [rule.actions[0], actionWithAlertsFilter] },
});
const executables = await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledTimes(1);
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledWith({
excludedAlertInstanceIds: [],
ruleId: '1',
spaceId: 'test1',
alertsFilter: { query: { kql: 'kibana.alert.rule.name:foo', filters: [] } },
start: new Date('1969-12-31T18:00:00.000Z'),
end: new Date('1970-01-01T00:00:00.000Z'),
});
expect(executables).toHaveLength(4);
expect(executables).toEqual([
{ action: rule.actions[0], alert: alerts['1'] },
{ action: rule.actions[0], alert: alerts['2'] },
{ action: actionWithAlertsFilter, alert: alerts['1'] },
{ action: actionWithAlertsFilter, alert: alerts['2'] },
]);
});
test('should skip generating executable if alert does not match any alerts in summarized alerts', async () => {
alertsClient.getProcessedAlerts.mockReturnValue(alerts);
const summarizedAlerts = {
new: {
count: 1,
data: [
{ ...mockAAD, [ALERT_UUID]: alerts[1].getUuid() },
{ ...mockAAD, [ALERT_UUID]: 'uuid-not-a-match' },
],
},
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
alertsClient.getSummarizedAlerts.mockResolvedValue(summarizedAlerts);
const actionWithAlertsFilter: SanitizedRuleAction = {
id: '1',
group: 'default',
actionTypeId: 'test',
frequency: { summary: false, notifyWhen: 'onActiveAlert', throttle: null },
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '111-111',
alertsFilter: { query: { kql: 'kibana.alert.rule.name:foo', filters: [] } },
};
const scheduler = new PerAlertActionScheduler({
...getSchedulerContext(),
rule: { ...rule, actions: [rule.actions[0], actionWithAlertsFilter] },
});
const executables = await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledTimes(1);
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledWith({
excludedAlertInstanceIds: [],
executionUuid: defaultSchedulerContext.executionId,
ruleId: '1',
spaceId: 'test1',
alertsFilter: { query: { kql: 'kibana.alert.rule.name:foo', filters: [] } },
});
expect(executables).toHaveLength(3);
expect(executables).toEqual([
{ action: rule.actions[0], alert: alerts['1'] },
{ action: rule.actions[0], alert: alerts['2'] },
{ action: actionWithAlertsFilter, alert: alerts['1'] },
]);
});
test('should set alerts as data', async () => {
alertsClient.getProcessedAlerts.mockReturnValue(alerts);
const summarizedAlerts = {
new: {
count: 1,
data: [
{ ...mockAAD, _id: alerts[1].getUuid(), [ALERT_UUID]: alerts[1].getUuid() },
{ ...mockAAD, _id: alerts[2].getUuid(), [ALERT_UUID]: alerts[2].getUuid() },
],
},
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
alertsClient.getSummarizedAlerts.mockResolvedValue(summarizedAlerts);
const actionWithAlertsFilter: SanitizedRuleAction = {
id: '1',
group: 'default',
actionTypeId: 'test',
frequency: { summary: false, notifyWhen: 'onActiveAlert', throttle: null },
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '111-111',
alertsFilter: { query: { kql: 'kibana.alert.rule.name:foo', filters: [] } },
};
const scheduler = new PerAlertActionScheduler({
...getSchedulerContext(),
rule: { ...rule, actions: [rule.actions[0], actionWithAlertsFilter] },
});
const executables = await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledTimes(1);
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledWith({
excludedAlertInstanceIds: [],
executionUuid: defaultSchedulerContext.executionId,
ruleId: '1',
spaceId: 'test1',
alertsFilter: { query: { kql: 'kibana.alert.rule.name:foo', filters: [] } },
});
expect(executables).toHaveLength(4);
expect(alerts['1'].getAlertAsData()).not.toBeUndefined();
expect(alerts['2'].getAlertAsData()).not.toBeUndefined();
expect(executables).toEqual([
{ action: rule.actions[0], alert: alerts['1'] },
{ action: rule.actions[0], alert: alerts['2'] },
{ action: actionWithAlertsFilter, alert: alerts['1'] },
{ action: actionWithAlertsFilter, alert: alerts['2'] },
]);
});
});
});

View file

@ -0,0 +1,264 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { AlertInstanceState, AlertInstanceContext } from '@kbn/alerting-state-types';
import { RuleAction, RuleNotifyWhen, RuleTypeParams } from '@kbn/alerting-types';
import { compact } from 'lodash';
import { RuleTypeState, RuleAlertData, parseDuration } from '../../../../common';
import { GetSummarizedAlertsParams } from '../../../alerts_client/types';
import { AlertHit } from '../../../types';
import { Alert } from '../../../alert';
import { getSummarizedAlerts } from '../get_summarized_alerts';
import {
generateActionHash,
isActionOnInterval,
isSummaryAction,
logNumberOfFilteredAlerts,
} from '../rule_action_helper';
import {
ActionSchedulerOptions,
Executable,
GenerateExecutablesOpts,
IActionScheduler,
} from '../types';
enum Reasons {
MUTED = 'muted',
THROTTLED = 'throttled',
ACTION_GROUP_NOT_CHANGED = 'actionGroupHasNotChanged',
}
export class PerAlertActionScheduler<
Params extends RuleTypeParams,
ExtractedParams extends RuleTypeParams,
RuleState extends RuleTypeState,
State extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string,
AlertData extends RuleAlertData
> implements IActionScheduler<State, Context, ActionGroupIds, RecoveryActionGroupId>
{
private actions: RuleAction[] = [];
private mutedAlertIdsSet: Set<string> = new Set();
private ruleTypeActionGroups?: Map<ActionGroupIds | RecoveryActionGroupId, string>;
private skippedAlerts: { [key: string]: { reason: string } } = {};
constructor(
private readonly context: ActionSchedulerOptions<
Params,
ExtractedParams,
RuleState,
State,
Context,
ActionGroupIds,
RecoveryActionGroupId,
AlertData
>
) {
this.ruleTypeActionGroups = new Map(
context.ruleType.actionGroups.map((actionGroup) => [actionGroup.id, actionGroup.name])
);
this.mutedAlertIdsSet = new Set(context.rule.mutedInstanceIds);
const canGetSummarizedAlerts =
!!context.ruleType.alerts && !!context.alertsClient.getSummarizedAlerts;
// filter for per-alert actions; if the action has an alertsFilter, check that
// rule type supports summarized alerts and filter out if not
this.actions = compact(
(context.rule.actions ?? [])
.filter((action) => !isSummaryAction(action))
.map((action) => {
if (!canGetSummarizedAlerts && action.alertsFilter) {
this.context.logger.error(
`Skipping action "${action.id}" for rule "${this.context.rule.id}" because the rule type "${this.context.ruleType.name}" does not support alert-as-data.`
);
return null;
}
return action;
})
);
}
public get priority(): number {
return 2;
}
public async generateExecutables({
alerts,
}: GenerateExecutablesOpts<State, Context, ActionGroupIds, RecoveryActionGroupId>): Promise<
Array<Executable<State, Context, ActionGroupIds, RecoveryActionGroupId>>
> {
const executables = [];
const alertsArray = Object.entries(alerts);
for (const action of this.actions) {
let summarizedAlerts = null;
if (action.useAlertDataForTemplate || action.alertsFilter) {
const optionsBase = {
spaceId: this.context.taskInstance.params.spaceId,
ruleId: this.context.taskInstance.params.alertId,
excludedAlertInstanceIds: this.context.rule.mutedInstanceIds,
alertsFilter: action.alertsFilter,
};
let options: GetSummarizedAlertsParams;
if (isActionOnInterval(action)) {
const throttleMills = parseDuration(action.frequency!.throttle!);
const start = new Date(Date.now() - throttleMills);
options = { ...optionsBase, start, end: new Date() };
} else {
options = { ...optionsBase, executionUuid: this.context.executionId };
}
summarizedAlerts = await getSummarizedAlerts({
queryOptions: options,
alertsClient: this.context.alertsClient,
});
logNumberOfFilteredAlerts({
logger: this.context.logger,
numberOfAlerts: Object.entries(alerts).length,
numberOfSummarizedAlerts: summarizedAlerts.all.count,
action,
});
}
for (const [alertId, alert] of alertsArray) {
const alertMaintenanceWindowIds = alert.getMaintenanceWindowIds();
if (alertMaintenanceWindowIds.length !== 0) {
this.context.logger.debug(
`no scheduling of summary actions "${action.id}" for rule "${
this.context.taskInstance.params.alertId
}": has active maintenance windows ${alertMaintenanceWindowIds.join(', ')}.`
);
continue;
}
if (alert.isFilteredOut(summarizedAlerts)) {
continue;
}
const actionGroup =
alert.getScheduledActionOptions()?.actionGroup ||
this.context.ruleType.recoveryActionGroup.id;
if (!this.ruleTypeActionGroups!.has(actionGroup)) {
this.context.logger.error(
`Invalid action group "${actionGroup}" for rule "${this.context.ruleType.id}".`
);
continue;
}
// only actions with notifyWhen set to "on status change" should return
// notifications for flapping pending recovered alerts
if (
alert.getPendingRecoveredCount() > 0 &&
action?.frequency?.notifyWhen !== RuleNotifyWhen.CHANGE
) {
continue;
}
if (summarizedAlerts) {
const alertAsData = summarizedAlerts.all.data.find(
(alertHit: AlertHit) => alertHit._id === alert.getUuid()
);
if (alertAsData) {
alert.setAlertAsData(alertAsData);
}
}
if (action.group === actionGroup && !this.isAlertMuted(alertId)) {
if (
this.isRecoveredAlert(action.group) ||
this.isExecutableActiveAlert({ alert, action })
) {
executables.push({ action, alert });
}
}
}
}
return executables;
}
private isAlertMuted(alertId: string) {
const muted = this.mutedAlertIdsSet.has(alertId);
if (muted) {
if (
!this.skippedAlerts[alertId] ||
(this.skippedAlerts[alertId] && this.skippedAlerts[alertId].reason !== Reasons.MUTED)
) {
this.context.logger.debug(
`skipping scheduling of actions for '${alertId}' in rule ${this.context.ruleLabel}: rule is muted`
);
}
this.skippedAlerts[alertId] = { reason: Reasons.MUTED };
return true;
}
return false;
}
private isExecutableActiveAlert({
alert,
action,
}: {
alert: Alert<AlertInstanceState, AlertInstanceContext, ActionGroupIds | RecoveryActionGroupId>;
action: RuleAction;
}) {
const alertId = alert.getId();
const {
context: { rule, logger, ruleLabel },
} = this;
const notifyWhen = action.frequency?.notifyWhen || rule.notifyWhen;
if (notifyWhen === 'onActionGroupChange' && !alert.scheduledActionGroupHasChanged()) {
if (
!this.skippedAlerts[alertId] ||
(this.skippedAlerts[alertId] &&
this.skippedAlerts[alertId].reason !== Reasons.ACTION_GROUP_NOT_CHANGED)
) {
logger.debug(
`skipping scheduling of actions for '${alertId}' in rule ${ruleLabel}: alert is active but action group has not changed`
);
}
this.skippedAlerts[alertId] = { reason: Reasons.ACTION_GROUP_NOT_CHANGED };
return false;
}
if (notifyWhen === 'onThrottleInterval') {
const throttled = action.frequency?.throttle
? alert.isThrottled({
throttle: action.frequency.throttle ?? null,
actionHash: generateActionHash(action), // generateActionHash must be removed once all the hash identifiers removed from the task state
uuid: action.uuid,
})
: alert.isThrottled({ throttle: rule.throttle ?? null });
if (throttled) {
if (
!this.skippedAlerts[alertId] ||
(this.skippedAlerts[alertId] && this.skippedAlerts[alertId].reason !== Reasons.THROTTLED)
) {
logger.debug(
`skipping scheduling of actions for '${alertId}' in rule ${ruleLabel}: rule is throttled`
);
}
this.skippedAlerts[alertId] = { reason: Reasons.THROTTLED };
return false;
}
}
return alert.hasScheduledActions();
}
private isRecoveredAlert(actionGroup: string) {
return actionGroup === this.context.ruleType.recoveryActionGroup.id;
}
}

View file

@ -0,0 +1,468 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import sinon from 'sinon';
import { actionsClientMock, actionsMock } from '@kbn/actions-plugin/server/mocks';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import { alertsClientMock } from '../../../alerts_client/alerts_client.mock';
import { alertingEventLoggerMock } from '../../../lib/alerting_event_logger/alerting_event_logger.mock';
import { RuleRunMetricsStore } from '../../../lib/rule_run_metrics_store';
import { mockAAD } from '../../fixtures';
import { SummaryActionScheduler } from './summary_action_scheduler';
import { getRule, getRuleType, getDefaultSchedulerContext, generateAlert } from '../test_fixtures';
import { RuleAction } from '@kbn/alerting-types';
import { ALERT_UUID } from '@kbn/rule-data-utils';
import {
getErrorSource,
TaskErrorSource,
} from '@kbn/task-manager-plugin/server/task_running/errors';
const alertingEventLogger = alertingEventLoggerMock.create();
const actionsClient = actionsClientMock.create();
const alertsClient = alertsClientMock.create();
const mockActionsPlugin = actionsMock.createStart();
const logger = loggingSystemMock.create().get();
let ruleRunMetricsStore: RuleRunMetricsStore;
const rule = getRule({
actions: [
{
id: '1',
group: 'default',
actionTypeId: 'test',
frequency: { summary: false, notifyWhen: 'onActiveAlert', throttle: null },
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '111-111',
},
{
id: '2',
group: 'default',
actionTypeId: 'test',
frequency: { summary: true, notifyWhen: 'onActiveAlert', throttle: null },
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '222-222',
},
{
id: '3',
group: 'default',
actionTypeId: 'test',
frequency: { summary: true, notifyWhen: 'onActiveAlert', throttle: null },
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '333-333',
},
],
});
const ruleType = getRuleType();
const defaultSchedulerContext = getDefaultSchedulerContext(
logger,
mockActionsPlugin,
alertingEventLogger,
actionsClient,
alertsClient
);
// @ts-ignore
const getSchedulerContext = (params = {}) => {
return { ...defaultSchedulerContext, rule, ...params, ruleRunMetricsStore };
};
let clock: sinon.SinonFakeTimers;
describe('Summary Action Scheduler', () => {
beforeAll(() => {
clock = sinon.useFakeTimers();
});
beforeEach(() => {
jest.resetAllMocks();
mockActionsPlugin.isActionTypeEnabled.mockReturnValue(true);
mockActionsPlugin.isActionExecutable.mockReturnValue(true);
mockActionsPlugin.getActionsClientWithRequest.mockResolvedValue(actionsClient);
ruleRunMetricsStore = new RuleRunMetricsStore();
});
afterAll(() => {
clock.restore();
});
test('should initialize with only summary actions', () => {
const scheduler = new SummaryActionScheduler(getSchedulerContext());
// @ts-expect-error private variable
expect(scheduler.actions).toHaveLength(2);
// @ts-expect-error private variable
expect(scheduler.actions).toEqual([rule.actions[1], rule.actions[2]]);
expect(logger.error).not.toHaveBeenCalled();
});
test('should log if rule type does not support summarized alerts and not initialize any actions', () => {
const scheduler = new SummaryActionScheduler(
getSchedulerContext({ ruleType: { ...ruleType, alerts: undefined } })
);
// @ts-expect-error private variable
expect(scheduler.actions).toHaveLength(0);
expect(logger.error).toHaveBeenCalledTimes(2);
expect(logger.error).toHaveBeenNthCalledWith(
1,
`Skipping action \"2\" for rule \"1\" because the rule type \"Test\" does not support alert-as-data.`
);
expect(logger.error).toHaveBeenNthCalledWith(
2,
`Skipping action \"3\" for rule \"1\" because the rule type \"Test\" does not support alert-as-data.`
);
});
describe('generateExecutables', () => {
const newAlert1 = generateAlert({ id: 1 });
const newAlert2 = generateAlert({ id: 2 });
const alerts = { ...newAlert1, ...newAlert2 };
const summaryActionWithAlertFilter: RuleAction = {
id: '2',
group: 'default',
actionTypeId: 'test',
frequency: {
summary: true,
notifyWhen: 'onActiveAlert',
throttle: null,
},
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
alertsFilter: { query: { kql: 'kibana.alert.rule.name:foo', dsl: '{}', filters: [] } },
uuid: '222-222',
};
const summaryActionWithThrottle: RuleAction = {
id: '2',
group: 'default',
actionTypeId: 'test',
frequency: {
summary: true,
notifyWhen: 'onThrottleInterval',
throttle: '1d',
},
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '222-222',
};
test('should generate executable for summary action when summary action is per rule run', async () => {
alertsClient.getProcessedAlerts.mockReturnValue(alerts);
const summarizedAlerts = {
new: { count: 2, data: [mockAAD, mockAAD] },
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
alertsClient.getSummarizedAlerts.mockResolvedValue(summarizedAlerts);
const scheduler = new SummaryActionScheduler(getSchedulerContext());
const executables = await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledTimes(2);
expect(alertsClient.getSummarizedAlerts).toHaveBeenNthCalledWith(1, {
excludedAlertInstanceIds: [],
executionUuid: defaultSchedulerContext.executionId,
ruleId: '1',
spaceId: 'test1',
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenNthCalledWith(2, {
excludedAlertInstanceIds: [],
executionUuid: defaultSchedulerContext.executionId,
ruleId: '1',
spaceId: 'test1',
});
expect(logger.debug).not.toHaveBeenCalled();
expect(executables).toHaveLength(2);
const finalSummary = { ...summarizedAlerts, all: { count: 2, data: [mockAAD, mockAAD] } };
expect(executables).toEqual([
{ action: rule.actions[1], summarizedAlerts: finalSummary },
{ action: rule.actions[2], summarizedAlerts: finalSummary },
]);
});
test('should generate executable for summary action when summary action has alertsFilter', async () => {
alertsClient.getProcessedAlerts.mockReturnValue(alerts);
const summarizedAlerts = {
new: { count: 2, data: [mockAAD, mockAAD] },
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
alertsClient.getSummarizedAlerts.mockResolvedValue(summarizedAlerts);
const scheduler = new SummaryActionScheduler({
...getSchedulerContext(),
rule: { ...rule, actions: [summaryActionWithAlertFilter] },
});
const executables = await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledTimes(1);
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledWith({
excludedAlertInstanceIds: [],
executionUuid: defaultSchedulerContext.executionId,
ruleId: '1',
spaceId: 'test1',
alertsFilter: { query: { kql: 'kibana.alert.rule.name:foo', dsl: '{}', filters: [] } },
});
expect(logger.debug).not.toHaveBeenCalled();
expect(executables).toHaveLength(1);
const finalSummary = { ...summarizedAlerts, all: { count: 2, data: [mockAAD, mockAAD] } };
expect(executables).toEqual([
{ action: summaryActionWithAlertFilter, summarizedAlerts: finalSummary },
]);
});
test('should generate executable for summary action when summary action is throttled with no throttle history', async () => {
alertsClient.getProcessedAlerts.mockReturnValue(alerts);
const summarizedAlerts = {
new: { count: 2, data: [mockAAD, mockAAD] },
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
alertsClient.getSummarizedAlerts.mockResolvedValue(summarizedAlerts);
const scheduler = new SummaryActionScheduler({
...getSchedulerContext(),
rule: { ...rule, actions: [summaryActionWithThrottle] },
});
const executables = await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledTimes(1);
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledWith({
excludedAlertInstanceIds: [],
ruleId: '1',
spaceId: 'test1',
start: new Date('1969-12-31T00:00:00.000Z'),
end: new Date(),
});
expect(logger.debug).not.toHaveBeenCalled();
expect(executables).toHaveLength(1);
const finalSummary = { ...summarizedAlerts, all: { count: 2, data: [mockAAD, mockAAD] } };
expect(executables).toEqual([
{ action: summaryActionWithThrottle, summarizedAlerts: finalSummary },
]);
});
test('should skip generating executable for summary action when summary action is throttled', async () => {
const scheduler = new SummaryActionScheduler({
...getSchedulerContext(),
rule: { ...rule, actions: [summaryActionWithThrottle] },
});
const executables = await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {
'222-222': { date: '1969-12-31T13:00:00.000Z' },
},
});
expect(alertsClient.getSummarizedAlerts).not.toHaveBeenCalled();
expect(logger.debug).toHaveBeenCalledWith(
`skipping scheduling the action 'test:2', summary action is still being throttled`
);
expect(executables).toHaveLength(0);
});
test('should remove new alerts from summary if suppressed by maintenance window', async () => {
const newAlertWithMaintenanceWindow = generateAlert({
id: 1,
maintenanceWindowIds: ['mw-1'],
});
const alertsWithMaintenanceWindow = { ...newAlertWithMaintenanceWindow, ...newAlert2 };
alertsClient.getProcessedAlerts.mockReturnValue(alertsWithMaintenanceWindow);
const newAADAlerts = [
{ ...mockAAD, [ALERT_UUID]: newAlertWithMaintenanceWindow[1].getUuid() },
mockAAD,
];
const summarizedAlerts = {
new: { count: 2, data: newAADAlerts },
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
alertsClient.getSummarizedAlerts.mockResolvedValue(summarizedAlerts);
const scheduler = new SummaryActionScheduler(getSchedulerContext());
const executables = await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledTimes(2);
expect(alertsClient.getSummarizedAlerts).toHaveBeenNthCalledWith(1, {
excludedAlertInstanceIds: [],
executionUuid: defaultSchedulerContext.executionId,
ruleId: '1',
spaceId: 'test1',
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenNthCalledWith(2, {
excludedAlertInstanceIds: [],
executionUuid: defaultSchedulerContext.executionId,
ruleId: '1',
spaceId: 'test1',
});
expect(logger.debug).toHaveBeenCalledTimes(2);
expect(logger.debug).toHaveBeenNthCalledWith(
1,
`(1) alert has been filtered out for: test:222-222`
);
expect(logger.debug).toHaveBeenNthCalledWith(
2,
`(1) alert has been filtered out for: test:333-333`
);
expect(executables).toHaveLength(2);
const finalSummary = {
all: { count: 1, data: [newAADAlerts[1]] },
new: { count: 1, data: [newAADAlerts[1]] },
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
expect(executables).toEqual([
{ action: rule.actions[1], summarizedAlerts: finalSummary },
{ action: rule.actions[2], summarizedAlerts: finalSummary },
]);
});
test('should generate executable for summary action and log when alerts have been filtered out by action condition', async () => {
alertsClient.getProcessedAlerts.mockReturnValue(alerts);
const summarizedAlerts = {
new: { count: 1, data: [mockAAD] },
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
alertsClient.getSummarizedAlerts.mockResolvedValue(summarizedAlerts);
const scheduler = new SummaryActionScheduler({
...getSchedulerContext(),
rule: { ...rule, actions: [summaryActionWithAlertFilter] },
});
const executables = await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledTimes(1);
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledWith({
excludedAlertInstanceIds: [],
executionUuid: defaultSchedulerContext.executionId,
ruleId: '1',
spaceId: 'test1',
alertsFilter: { query: { kql: 'kibana.alert.rule.name:foo', dsl: '{}', filters: [] } },
});
expect(logger.debug).toHaveBeenCalledTimes(1);
expect(logger.debug).toHaveBeenCalledWith(
`(1) alert has been filtered out for: test:222-222`
);
expect(executables).toHaveLength(1);
const finalSummary = { ...summarizedAlerts, all: { count: 1, data: [mockAAD] } };
expect(executables).toEqual([
{ action: summaryActionWithAlertFilter, summarizedAlerts: finalSummary },
]);
});
test('should skip generating executable for summary action when no alerts found', async () => {
alertsClient.getProcessedAlerts.mockReturnValue(alerts);
const summarizedAlerts = {
new: { count: 0, data: [] },
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
alertsClient.getSummarizedAlerts.mockResolvedValue(summarizedAlerts);
const scheduler = new SummaryActionScheduler({
...getSchedulerContext(),
rule: { ...rule, actions: [summaryActionWithThrottle] },
});
const executables = await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledTimes(1);
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledWith({
excludedAlertInstanceIds: [],
ruleId: '1',
spaceId: 'test1',
start: new Date('1969-12-31T00:00:00.000Z'),
end: new Date(),
});
expect(logger.debug).not.toHaveBeenCalled();
expect(executables).toHaveLength(0);
});
test('should throw framework error if getSummarizedAlerts throws error', async () => {
alertsClient.getProcessedAlerts.mockReturnValue(alerts);
alertsClient.getSummarizedAlerts.mockImplementation(() => {
throw new Error('no alerts for you');
});
const scheduler = new SummaryActionScheduler(getSchedulerContext());
try {
await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
} catch (err) {
expect(err.message).toEqual(`no alerts for you`);
expect(getErrorSource(err)).toBe(TaskErrorSource.FRAMEWORK);
}
});
});
});

View file

@ -0,0 +1,127 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { AlertInstanceState, AlertInstanceContext } from '@kbn/alerting-state-types';
import { RuleAction, RuleTypeParams } from '@kbn/alerting-types';
import { compact } from 'lodash';
import { RuleTypeState, RuleAlertData, parseDuration } from '../../../../common';
import { GetSummarizedAlertsParams } from '../../../alerts_client/types';
import { getSummarizedAlerts } from '../get_summarized_alerts';
import {
isActionOnInterval,
isSummaryAction,
isSummaryActionThrottled,
logNumberOfFilteredAlerts,
} from '../rule_action_helper';
import {
ActionSchedulerOptions,
Executable,
GenerateExecutablesOpts,
IActionScheduler,
} from '../types';
export class SummaryActionScheduler<
Params extends RuleTypeParams,
ExtractedParams extends RuleTypeParams,
RuleState extends RuleTypeState,
State extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string,
AlertData extends RuleAlertData
> implements IActionScheduler<State, Context, ActionGroupIds, RecoveryActionGroupId>
{
private actions: RuleAction[] = [];
constructor(
private readonly context: ActionSchedulerOptions<
Params,
ExtractedParams,
RuleState,
State,
Context,
ActionGroupIds,
RecoveryActionGroupId,
AlertData
>
) {
const canGetSummarizedAlerts =
!!context.ruleType.alerts && !!context.alertsClient.getSummarizedAlerts;
// filter for summary actions where the rule type supports summarized alerts
this.actions = compact(
(context.rule.actions ?? [])
.filter((action) => isSummaryAction(action))
.map((action) => {
if (!canGetSummarizedAlerts) {
this.context.logger.error(
`Skipping action "${action.id}" for rule "${this.context.rule.id}" because the rule type "${this.context.ruleType.name}" does not support alert-as-data.`
);
return null;
}
return action;
})
);
}
public get priority(): number {
return 0;
}
public async generateExecutables({
alerts,
throttledSummaryActions,
}: GenerateExecutablesOpts<State, Context, ActionGroupIds, RecoveryActionGroupId>): Promise<
Array<Executable<State, Context, ActionGroupIds, RecoveryActionGroupId>>
> {
const executables = [];
for (const action of this.actions) {
if (
// if summary action is throttled, we won't send any notifications
!isSummaryActionThrottled({ action, throttledSummaryActions, logger: this.context.logger })
) {
const actionHasThrottleInterval = isActionOnInterval(action);
const optionsBase = {
spaceId: this.context.taskInstance.params.spaceId,
ruleId: this.context.taskInstance.params.alertId,
excludedAlertInstanceIds: this.context.rule.mutedInstanceIds,
alertsFilter: action.alertsFilter,
};
let options: GetSummarizedAlertsParams;
if (actionHasThrottleInterval) {
const throttleMills = parseDuration(action.frequency!.throttle!);
const start = new Date(Date.now() - throttleMills);
options = { ...optionsBase, start, end: new Date() };
} else {
options = { ...optionsBase, executionUuid: this.context.executionId };
}
const summarizedAlerts = await getSummarizedAlerts({
queryOptions: options,
alertsClient: this.context.alertsClient,
});
if (!actionHasThrottleInterval) {
logNumberOfFilteredAlerts({
logger: this.context.logger,
numberOfAlerts: Object.entries(alerts).length,
numberOfSummarizedAlerts: summarizedAlerts.all.count,
action,
});
}
if (summarizedAlerts.all.count !== 0) {
executables.push({ action, summarizedAlerts });
}
}
}
return executables;
}
}

View file

@ -0,0 +1,218 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import sinon from 'sinon';
import { actionsClientMock, actionsMock } from '@kbn/actions-plugin/server/mocks';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import { alertsClientMock } from '../../../alerts_client/alerts_client.mock';
import { alertingEventLoggerMock } from '../../../lib/alerting_event_logger/alerting_event_logger.mock';
import { RuleRunMetricsStore } from '../../../lib/rule_run_metrics_store';
import { mockAAD } from '../../fixtures';
import { getRule, getRuleType, getDefaultSchedulerContext, generateAlert } from '../test_fixtures';
import { SystemActionScheduler } from './system_action_scheduler';
import { ALERT_UUID } from '@kbn/rule-data-utils';
import {
getErrorSource,
TaskErrorSource,
} from '@kbn/task-manager-plugin/server/task_running/errors';
const alertingEventLogger = alertingEventLoggerMock.create();
const actionsClient = actionsClientMock.create();
const alertsClient = alertsClientMock.create();
const mockActionsPlugin = actionsMock.createStart();
const logger = loggingSystemMock.create().get();
let ruleRunMetricsStore: RuleRunMetricsStore;
const rule = getRule({
systemActions: [
{
id: '1',
actionTypeId: '.test-system-action',
params: { myParams: 'test' },
uui: 'test',
},
],
});
const ruleType = getRuleType();
const defaultSchedulerContext = getDefaultSchedulerContext(
logger,
mockActionsPlugin,
alertingEventLogger,
actionsClient,
alertsClient
);
// @ts-ignore
const getSchedulerContext = (params = {}) => {
return { ...defaultSchedulerContext, rule, ...params, ruleRunMetricsStore };
};
let clock: sinon.SinonFakeTimers;
describe('System Action Scheduler', () => {
beforeAll(() => {
clock = sinon.useFakeTimers();
});
beforeEach(() => {
jest.resetAllMocks();
mockActionsPlugin.isActionTypeEnabled.mockReturnValue(true);
mockActionsPlugin.isActionExecutable.mockReturnValue(true);
mockActionsPlugin.getActionsClientWithRequest.mockResolvedValue(actionsClient);
ruleRunMetricsStore = new RuleRunMetricsStore();
});
afterAll(() => {
clock.restore();
});
test('should initialize with only system actions', () => {
const scheduler = new SystemActionScheduler(getSchedulerContext());
// @ts-expect-error private variable
expect(scheduler.actions).toHaveLength(1);
// @ts-expect-error private variable
expect(scheduler.actions).toEqual(rule.systemActions);
});
test('should not initialize any system actions if rule type does not support summarized alerts', () => {
const scheduler = new SystemActionScheduler(
getSchedulerContext({ ruleType: { ...ruleType, alerts: undefined } })
);
// @ts-expect-error private variable
expect(scheduler.actions).toHaveLength(0);
});
describe('generateExecutables', () => {
const newAlert1 = generateAlert({ id: 1 });
const newAlert2 = generateAlert({ id: 2 });
const alerts = { ...newAlert1, ...newAlert2 };
test('should generate executable for each system action', async () => {
alertsClient.getProcessedAlerts.mockReturnValue(alerts);
const summarizedAlerts = {
new: { count: 2, data: [mockAAD, mockAAD] },
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
alertsClient.getSummarizedAlerts.mockResolvedValue(summarizedAlerts);
const scheduler = new SystemActionScheduler(getSchedulerContext());
const executables = await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledTimes(1);
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledWith({
excludedAlertInstanceIds: [],
executionUuid: defaultSchedulerContext.executionId,
ruleId: '1',
spaceId: 'test1',
});
expect(executables).toHaveLength(1);
const finalSummary = { ...summarizedAlerts, all: { count: 2, data: [mockAAD, mockAAD] } };
expect(executables).toEqual([
{ action: rule.systemActions?.[0], summarizedAlerts: finalSummary },
]);
});
test('should remove new alerts from summary if suppressed by maintenance window', async () => {
const newAlertWithMaintenanceWindow = generateAlert({
id: 1,
maintenanceWindowIds: ['mw-1'],
});
const alertsWithMaintenanceWindow = { ...newAlertWithMaintenanceWindow, ...newAlert2 };
alertsClient.getProcessedAlerts.mockReturnValue(alertsWithMaintenanceWindow);
const newAADAlerts = [
{ ...mockAAD, [ALERT_UUID]: newAlertWithMaintenanceWindow[1].getUuid() },
mockAAD,
];
const summarizedAlerts = {
new: { count: 2, data: newAADAlerts },
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
alertsClient.getSummarizedAlerts.mockResolvedValue(summarizedAlerts);
const scheduler = new SystemActionScheduler(getSchedulerContext());
const executables = await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledTimes(1);
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledWith({
excludedAlertInstanceIds: [],
executionUuid: defaultSchedulerContext.executionId,
ruleId: '1',
spaceId: 'test1',
});
expect(executables).toHaveLength(1);
const finalSummary = {
all: { count: 1, data: [newAADAlerts[1]] },
new: { count: 1, data: [newAADAlerts[1]] },
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
expect(executables).toEqual([
{ action: rule.systemActions?.[0], summarizedAlerts: finalSummary },
]);
});
test('should skip generating executable for summary action when no alerts found', async () => {
alertsClient.getProcessedAlerts.mockReturnValue(alerts);
const summarizedAlerts = {
new: { count: 0, data: [] },
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
};
alertsClient.getSummarizedAlerts.mockResolvedValue(summarizedAlerts);
const scheduler = new SystemActionScheduler(getSchedulerContext());
const executables = await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledTimes(1);
expect(alertsClient.getSummarizedAlerts).toHaveBeenCalledWith({
excludedAlertInstanceIds: [],
executionUuid: defaultSchedulerContext.executionId,
ruleId: '1',
spaceId: 'test1',
});
expect(executables).toHaveLength(0);
});
test('should throw framework error if getSummarizedAlerts throws error', async () => {
alertsClient.getProcessedAlerts.mockReturnValue(alerts);
alertsClient.getSummarizedAlerts.mockImplementation(() => {
throw new Error('no alerts for you');
});
const scheduler = new SystemActionScheduler(getSchedulerContext());
try {
await scheduler.generateExecutables({
alerts,
throttledSummaryActions: {},
});
} catch (err) {
expect(err.message).toEqual(`no alerts for you`);
expect(getErrorSource(err)).toBe(TaskErrorSource.FRAMEWORK);
}
});
});
});

View file

@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { AlertInstanceState, AlertInstanceContext } from '@kbn/alerting-state-types';
import { RuleSystemAction, RuleTypeParams } from '@kbn/alerting-types';
import { RuleTypeState, RuleAlertData } from '../../../../common';
import { GetSummarizedAlertsParams } from '../../../alerts_client/types';
import { getSummarizedAlerts } from '../get_summarized_alerts';
import {
ActionSchedulerOptions,
Executable,
GenerateExecutablesOpts,
IActionScheduler,
} from '../types';
export class SystemActionScheduler<
Params extends RuleTypeParams,
ExtractedParams extends RuleTypeParams,
RuleState extends RuleTypeState,
State extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string,
AlertData extends RuleAlertData
> implements IActionScheduler<State, Context, ActionGroupIds, RecoveryActionGroupId>
{
private actions: RuleSystemAction[] = [];
constructor(
private readonly context: ActionSchedulerOptions<
Params,
ExtractedParams,
RuleState,
State,
Context,
ActionGroupIds,
RecoveryActionGroupId,
AlertData
>
) {
const canGetSummarizedAlerts =
!!context.ruleType.alerts && !!context.alertsClient.getSummarizedAlerts;
// only process system actions when rule type supports summarized alerts
this.actions = canGetSummarizedAlerts ? context.rule.systemActions ?? [] : [];
}
public get priority(): number {
return 1;
}
public async generateExecutables(
_: GenerateExecutablesOpts<State, Context, ActionGroupIds, RecoveryActionGroupId>
): Promise<Array<Executable<State, Context, ActionGroupIds, RecoveryActionGroupId>>> {
const executables = [];
for (const action of this.actions) {
const options: GetSummarizedAlertsParams = {
spaceId: this.context.taskInstance.params.spaceId,
ruleId: this.context.taskInstance.params.alertId,
excludedAlertInstanceIds: this.context.rule.mutedInstanceIds,
executionUuid: this.context.executionId,
};
const summarizedAlerts = await getSummarizedAlerts({
queryOptions: options,
alertsClient: this.context.alertsClient,
});
if (summarizedAlerts && summarizedAlerts.all.count !== 0) {
executables.push({ action, summarizedAlerts });
}
}
return executables;
}
}

View file

@ -0,0 +1,208 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Logger } from '@kbn/core/server';
import {
AlertInstanceState,
AlertInstanceContext,
ThrottledActions,
} from '@kbn/alerting-state-types';
import { RuleTypeParams, SanitizedRule } from '@kbn/alerting-types';
import { schema } from '@kbn/config-schema';
import { KibanaRequest } from '@kbn/core-http-server';
import { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server';
import { ActionsClient, PluginStartContract } from '@kbn/actions-plugin/server';
import { PublicMethodsOf } from '@kbn/utility-types';
import { RuleAlertData, RuleTypeState } from '../../../common';
import { ConnectorAdapterRegistry } from '../../connector_adapters/connector_adapter_registry';
import { NormalizedRuleType } from '../../rule_type_registry';
import { TaskRunnerContext } from '../types';
import { AlertingEventLogger } from '../../lib/alerting_event_logger/alerting_event_logger';
import { Alert } from '../../alert';
const apiKey = Buffer.from('123:abc').toString('base64');
type ActiveActionGroup = 'default' | 'other-group';
export const generateAlert = ({
id,
group = 'default',
context,
state,
scheduleActions = true,
throttledActions = {},
lastScheduledActionsGroup = 'default',
maintenanceWindowIds,
pendingRecoveredCount,
activeCount,
}: {
id: number;
group?: ActiveActionGroup | 'recovered';
context?: AlertInstanceContext;
state?: AlertInstanceState;
scheduleActions?: boolean;
throttledActions?: ThrottledActions;
lastScheduledActionsGroup?: string;
maintenanceWindowIds?: string[];
pendingRecoveredCount?: number;
activeCount?: number;
}) => {
const alert = new Alert<AlertInstanceState, AlertInstanceContext, 'default' | 'other-group'>(
String(id),
{
state: state || { test: true },
meta: {
maintenanceWindowIds,
lastScheduledActions: {
date: new Date().toISOString(),
group: lastScheduledActionsGroup,
actions: throttledActions,
},
pendingRecoveredCount,
activeCount,
},
}
);
if (scheduleActions) {
alert.scheduleActions(group as ActiveActionGroup);
}
if (context) {
alert.setContext(context);
}
return { [id]: alert };
};
export const generateRecoveredAlert = ({
id,
state,
}: {
id: number;
state?: AlertInstanceState;
}) => {
const alert = new Alert<AlertInstanceState, AlertInstanceContext, 'recovered'>(String(id), {
state: state || { test: true },
meta: {
lastScheduledActions: {
date: new Date().toISOString(),
group: 'recovered',
actions: {},
},
},
});
return { [id]: alert };
};
export const getRule = (overrides = {}) =>
({
id: '1',
name: 'name-of-alert',
tags: ['tag-A', 'tag-B'],
mutedInstanceIds: [],
params: {
foo: true,
contextVal: 'My other {{context.value}} goes here',
stateVal: 'My other {{state.value}} goes here',
},
schedule: { interval: '1m' },
notifyWhen: 'onActiveAlert',
actions: [
{
id: '1',
group: 'default',
actionTypeId: 'test',
params: {
foo: true,
contextVal: 'My {{context.value}} goes here',
stateVal: 'My {{state.value}} goes here',
alertVal:
'My {{rule.id}} {{rule.name}} {{rule.spaceId}} {{rule.tags}} {{alert.id}} goes here',
},
uuid: '111-111',
},
],
consumer: 'test-consumer',
...overrides,
} as unknown as SanitizedRule<RuleTypeParams>);
export const getRuleType = (): NormalizedRuleType<
RuleTypeParams,
RuleTypeParams,
RuleTypeState,
AlertInstanceState,
AlertInstanceContext,
'default' | 'other-group',
'recovered',
{}
> => ({
id: 'test',
name: 'Test',
actionGroups: [
{ id: 'default', name: 'Default' },
{ id: 'recovered', name: 'Recovered' },
{ id: 'other-group', name: 'Other Group' },
],
defaultActionGroupId: 'default',
minimumLicenseRequired: 'basic',
isExportable: true,
recoveryActionGroup: {
id: 'recovered',
name: 'Recovered',
},
executor: jest.fn(),
category: 'test',
producer: 'alerts',
validate: {
params: schema.any(),
},
alerts: {
context: 'context',
mappings: { fieldMap: { field: { type: 'fieldType', required: false } } },
},
autoRecoverAlerts: false,
validLegacyConsumers: [],
});
export const getDefaultSchedulerContext = <
State extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string,
AlertData extends RuleAlertData
>(
loggerMock: Logger,
actionsPluginMock: jest.Mocked<PluginStartContract>,
alertingEventLoggerMock: jest.Mocked<AlertingEventLogger>,
actionsClientMock: jest.Mocked<PublicMethodsOf<ActionsClient>>,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
alertsClientMock: jest.Mocked<any>
) => ({
rule: getRule(),
ruleType: getRuleType(),
logger: loggerMock,
taskRunnerContext: {
actionsConfigMap: {
default: {
max: 1000,
},
},
actionsPlugin: actionsPluginMock,
connectorAdapterRegistry: new ConnectorAdapterRegistry(),
} as unknown as TaskRunnerContext,
apiKey,
ruleConsumer: 'rule-consumer',
executionId: '5f6aa57d-3e22-484e-bae8-cbed868f4d28',
alertUuid: 'uuid-1',
ruleLabel: 'rule-label',
request: {} as KibanaRequest,
alertingEventLogger: alertingEventLoggerMock,
previousStartedAt: null,
taskInstance: {
params: { spaceId: 'test1', alertId: '1' },
} as unknown as ConcreteTaskInstance,
actionsClient: actionsClientMock,
alertsClient: alertsClientMock,
});

View file

@ -0,0 +1,111 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Logger } from '@kbn/core/server';
import { PublicMethodsOf } from '@kbn/utility-types';
import { ActionsClient } from '@kbn/actions-plugin/server/actions_client';
import { IAlertsClient } from '../../alerts_client/types';
import { Alert } from '../../alert';
import {
AlertInstanceContext,
AlertInstanceState,
RuleTypeParams,
SanitizedRule,
RuleTypeState,
RuleAction,
RuleAlertData,
RuleSystemAction,
ThrottledActions,
} from '../../../common';
import { NormalizedRuleType } from '../../rule_type_registry';
import { CombinedSummarizedAlerts, RawRule } from '../../types';
import { RuleRunMetricsStore } from '../../lib/rule_run_metrics_store';
import { AlertingEventLogger } from '../../lib/alerting_event_logger/alerting_event_logger';
import { RuleTaskInstance, TaskRunnerContext } from '../types';
export interface ActionSchedulerOptions<
Params extends RuleTypeParams,
ExtractedParams extends RuleTypeParams,
RuleState extends RuleTypeState,
State extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string,
AlertData extends RuleAlertData
> {
ruleType: NormalizedRuleType<
Params,
ExtractedParams,
RuleState,
State,
Context,
ActionGroupIds,
RecoveryActionGroupId,
AlertData
>;
logger: Logger;
alertingEventLogger: PublicMethodsOf<AlertingEventLogger>;
rule: SanitizedRule<Params>;
taskRunnerContext: TaskRunnerContext;
taskInstance: RuleTaskInstance;
ruleRunMetricsStore: RuleRunMetricsStore;
apiKey: RawRule['apiKey'];
ruleConsumer: string;
executionId: string;
ruleLabel: string;
previousStartedAt: Date | null;
actionsClient: PublicMethodsOf<ActionsClient>;
alertsClient: IAlertsClient<AlertData, State, Context, ActionGroupIds, RecoveryActionGroupId>;
}
export type Executable<
State extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string
> = {
action: RuleAction | RuleSystemAction;
} & (
| {
alert: Alert<State, Context, ActionGroupIds | RecoveryActionGroupId>;
summarizedAlerts?: never;
}
| {
alert?: never;
summarizedAlerts: CombinedSummarizedAlerts;
}
);
export interface GenerateExecutablesOpts<
State extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string
> {
alerts: Record<string, Alert<State, Context, ActionGroupIds | RecoveryActionGroupId>>;
throttledSummaryActions: ThrottledActions;
}
export interface IActionScheduler<
State extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string
> {
get priority(): number;
generateExecutables(
opts: GenerateExecutablesOpts<State, Context, ActionGroupIds, RecoveryActionGroupId>
): Promise<Array<Executable<State, Context, ActionGroupIds, RecoveryActionGroupId>>>;
}
export interface RuleUrl {
absoluteUrl?: string;
kibanaBaseUrl?: string;
basePathname?: string;
spaceIdSegment?: string;
relativePath?: string;
}

View file

@ -1,975 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { PublicMethodsOf } from '@kbn/utility-types';
import { Logger } from '@kbn/core/server';
import { ALERT_UUID, getRuleDetailsRoute, triggersActionsRoute } from '@kbn/rule-data-utils';
import { asSavedObjectExecutionSource } from '@kbn/actions-plugin/server';
import {
createTaskRunError,
isEphemeralTaskRejectedDueToCapacityError,
TaskErrorSource,
} from '@kbn/task-manager-plugin/server';
import {
ExecuteOptions as EnqueueExecutionOptions,
ExecutionResponseItem,
ExecutionResponseType,
} from '@kbn/actions-plugin/server/create_execute_function';
import { ActionsCompletion } from '@kbn/alerting-state-types';
import { ActionsClient } from '@kbn/actions-plugin/server/actions_client';
import { chunk } from 'lodash';
import { GetSummarizedAlertsParams, IAlertsClient } from '../alerts_client/types';
import { AlertingEventLogger } from '../lib/alerting_event_logger/alerting_event_logger';
import { AlertHit, parseDuration, CombinedSummarizedAlerts, ThrottledActions } from '../types';
import { RuleRunMetricsStore } from '../lib/rule_run_metrics_store';
import { injectActionParams } from './inject_action_params';
import { Executable, ExecutionHandlerOptions, RuleTaskInstance, TaskRunnerContext } from './types';
import {
transformActionParams,
TransformActionParamsOptions,
transformSummaryActionParams,
} from './transform_action_params';
import { Alert } from '../alert';
import { NormalizedRuleType } from '../rule_type_registry';
import {
AlertInstanceContext,
AlertInstanceState,
RuleAction,
RuleTypeParams,
RuleTypeState,
SanitizedRule,
RuleAlertData,
RuleNotifyWhen,
RuleSystemAction,
} from '../../common';
import {
generateActionHash,
getSummaryActionsFromTaskState,
getSummaryActionTimeBounds,
isActionOnInterval,
isSummaryAction,
isSummaryActionOnInterval,
isSummaryActionThrottled,
} from './rule_action_helper';
import { RULE_SAVED_OBJECT_TYPE } from '../saved_objects';
import { ConnectorAdapter } from '../connector_adapters/types';
import { withAlertingSpan } from './lib';
enum Reasons {
MUTED = 'muted',
THROTTLED = 'throttled',
ACTION_GROUP_NOT_CHANGED = 'actionGroupHasNotChanged',
}
interface LogAction {
id: string;
typeId: string;
alertId?: string;
alertGroup?: string;
alertSummary?: {
new: number;
ongoing: number;
recovered: number;
};
}
interface RunSummarizedActionArgs {
action: RuleAction;
summarizedAlerts: CombinedSummarizedAlerts;
spaceId: string;
bulkActions: EnqueueExecutionOptions[];
}
interface RunSystemActionArgs<Params extends RuleTypeParams> {
action: RuleSystemAction;
connectorAdapter: ConnectorAdapter;
summarizedAlerts: CombinedSummarizedAlerts;
rule: SanitizedRule<Params>;
ruleProducer: string;
spaceId: string;
bulkActions: EnqueueExecutionOptions[];
}
interface RunActionArgs<
State extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string
> {
action: RuleAction;
alert: Alert<State, Context, ActionGroupIds | RecoveryActionGroupId>;
ruleId: string;
spaceId: string;
bulkActions: EnqueueExecutionOptions[];
}
export interface RunResult {
throttledSummaryActions: ThrottledActions;
}
export interface RuleUrl {
absoluteUrl?: string;
kibanaBaseUrl?: string;
basePathname?: string;
spaceIdSegment?: string;
relativePath?: string;
}
export class ExecutionHandler<
Params extends RuleTypeParams,
ExtractedParams extends RuleTypeParams,
RuleState extends RuleTypeState,
State extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string,
AlertData extends RuleAlertData
> {
private logger: Logger;
private alertingEventLogger: PublicMethodsOf<AlertingEventLogger>;
private rule: SanitizedRule<Params>;
private ruleType: NormalizedRuleType<
Params,
ExtractedParams,
RuleState,
State,
Context,
ActionGroupIds,
RecoveryActionGroupId,
AlertData
>;
private taskRunnerContext: TaskRunnerContext;
private taskInstance: RuleTaskInstance;
private ruleRunMetricsStore: RuleRunMetricsStore;
private apiKey: string | null;
private ruleConsumer: string;
private executionId: string;
private ruleLabel: string;
private ephemeralActionsToSchedule: number;
private CHUNK_SIZE = 1000;
private skippedAlerts: { [key: string]: { reason: string } } = {};
private actionsClient: PublicMethodsOf<ActionsClient>;
private ruleTypeActionGroups?: Map<ActionGroupIds | RecoveryActionGroupId, string>;
private mutedAlertIdsSet: Set<string> = new Set();
private previousStartedAt: Date | null;
private alertsClient: IAlertsClient<
AlertData,
State,
Context,
ActionGroupIds,
RecoveryActionGroupId
>;
constructor({
rule,
ruleType,
logger,
alertingEventLogger,
taskRunnerContext,
taskInstance,
ruleRunMetricsStore,
apiKey,
ruleConsumer,
executionId,
ruleLabel,
previousStartedAt,
actionsClient,
alertsClient,
}: ExecutionHandlerOptions<
Params,
ExtractedParams,
RuleState,
State,
Context,
ActionGroupIds,
RecoveryActionGroupId,
AlertData
>) {
this.logger = logger;
this.alertingEventLogger = alertingEventLogger;
this.rule = rule;
this.ruleType = ruleType;
this.taskRunnerContext = taskRunnerContext;
this.taskInstance = taskInstance;
this.ruleRunMetricsStore = ruleRunMetricsStore;
this.apiKey = apiKey;
this.ruleConsumer = ruleConsumer;
this.executionId = executionId;
this.ruleLabel = ruleLabel;
this.actionsClient = actionsClient;
this.ephemeralActionsToSchedule = taskRunnerContext.maxEphemeralActionsPerRule;
this.ruleTypeActionGroups = new Map(
ruleType.actionGroups.map((actionGroup) => [actionGroup.id, actionGroup.name])
);
this.previousStartedAt = previousStartedAt;
this.mutedAlertIdsSet = new Set(rule.mutedInstanceIds);
this.alertsClient = alertsClient;
}
public async run(
alerts: Record<string, Alert<State, Context, ActionGroupIds | RecoveryActionGroupId>>
): Promise<RunResult> {
const throttledSummaryActions: ThrottledActions = getSummaryActionsFromTaskState({
actions: this.rule.actions,
summaryActions: this.taskInstance.state?.summaryActions,
});
const executables = await this.generateExecutables(alerts, throttledSummaryActions);
if (executables.length === 0) {
return { throttledSummaryActions };
}
const {
CHUNK_SIZE,
logger,
alertingEventLogger,
ruleRunMetricsStore,
taskRunnerContext: { actionsConfigMap },
taskInstance: {
params: { spaceId, alertId: ruleId },
},
} = this;
const logActions: Record<string, LogAction> = {};
const bulkActions: EnqueueExecutionOptions[] = [];
let bulkActionsResponse: ExecutionResponseItem[] = [];
this.ruleRunMetricsStore.incrementNumberOfGeneratedActions(executables.length);
for (const { action, alert, summarizedAlerts } of executables) {
const { actionTypeId } = action;
ruleRunMetricsStore.incrementNumberOfGeneratedActionsByConnectorType(actionTypeId);
if (ruleRunMetricsStore.hasReachedTheExecutableActionsLimit(actionsConfigMap)) {
ruleRunMetricsStore.setTriggeredActionsStatusByConnectorType({
actionTypeId,
status: ActionsCompletion.PARTIAL,
});
logger.debug(
`Rule "${this.rule.id}" skipped scheduling action "${action.id}" because the maximum number of allowed actions has been reached.`
);
break;
}
if (
ruleRunMetricsStore.hasReachedTheExecutableActionsLimitByConnectorType({
actionTypeId,
actionsConfigMap,
})
) {
if (!ruleRunMetricsStore.hasConnectorTypeReachedTheLimit(actionTypeId)) {
logger.debug(
`Rule "${this.rule.id}" skipped scheduling action "${action.id}" because the maximum number of allowed actions for connector type ${actionTypeId} has been reached.`
);
}
ruleRunMetricsStore.setTriggeredActionsStatusByConnectorType({
actionTypeId,
status: ActionsCompletion.PARTIAL,
});
continue;
}
if (!this.isExecutableAction(action)) {
this.logger.warn(
`Rule "${this.taskInstance.params.alertId}" skipped scheduling action "${action.id}" because it is disabled`
);
continue;
}
ruleRunMetricsStore.incrementNumberOfTriggeredActions();
ruleRunMetricsStore.incrementNumberOfTriggeredActionsByConnectorType(actionTypeId);
if (!this.isSystemAction(action) && summarizedAlerts) {
const defaultAction = action as RuleAction;
if (isActionOnInterval(action)) {
throttledSummaryActions[defaultAction.uuid!] = { date: new Date().toISOString() };
}
logActions[defaultAction.id] = await this.runSummarizedAction({
action,
summarizedAlerts,
spaceId,
bulkActions,
});
} else if (summarizedAlerts && this.isSystemAction(action)) {
const hasConnectorAdapter = this.taskRunnerContext.connectorAdapterRegistry.has(
action.actionTypeId
);
/**
* System actions without an adapter
* cannot be executed
*
*/
if (!hasConnectorAdapter) {
this.logger.warn(
`Rule "${this.taskInstance.params.alertId}" skipped scheduling system action "${action.id}" because no connector adapter is configured`
);
continue;
}
const connectorAdapter = this.taskRunnerContext.connectorAdapterRegistry.get(
action.actionTypeId
);
logActions[action.id] = await this.runSystemAction({
action,
connectorAdapter,
summarizedAlerts,
rule: this.rule,
ruleProducer: this.ruleType.producer,
spaceId,
bulkActions,
});
} else if (!this.isSystemAction(action) && alert) {
const defaultAction = action as RuleAction;
logActions[defaultAction.id] = await this.runAction({
action,
spaceId,
alert,
ruleId,
bulkActions,
});
const actionGroup = defaultAction.group;
if (!this.isRecoveredAlert(actionGroup)) {
if (isActionOnInterval(action)) {
alert.updateLastScheduledActions(
defaultAction.group as ActionGroupIds,
generateActionHash(action),
defaultAction.uuid
);
} else {
alert.updateLastScheduledActions(defaultAction.group as ActionGroupIds);
}
alert.unscheduleActions();
}
}
}
if (!!bulkActions.length) {
for (const c of chunk(bulkActions, CHUNK_SIZE)) {
let enqueueResponse;
try {
enqueueResponse = await withAlertingSpan('alerting:bulk-enqueue-actions', () =>
this.actionsClient!.bulkEnqueueExecution(c)
);
} catch (e) {
if (e.statusCode === 404) {
throw createTaskRunError(e, TaskErrorSource.USER);
}
throw createTaskRunError(e, TaskErrorSource.FRAMEWORK);
}
if (enqueueResponse.errors) {
bulkActionsResponse = bulkActionsResponse.concat(
enqueueResponse.items.filter(
(i) => i.response === ExecutionResponseType.QUEUED_ACTIONS_LIMIT_ERROR
)
);
}
}
}
if (!!bulkActionsResponse.length) {
for (const r of bulkActionsResponse) {
if (r.response === ExecutionResponseType.QUEUED_ACTIONS_LIMIT_ERROR) {
ruleRunMetricsStore.setHasReachedQueuedActionsLimit(true);
ruleRunMetricsStore.decrementNumberOfTriggeredActions();
ruleRunMetricsStore.decrementNumberOfTriggeredActionsByConnectorType(r.actionTypeId);
ruleRunMetricsStore.setTriggeredActionsStatusByConnectorType({
actionTypeId: r.actionTypeId,
status: ActionsCompletion.PARTIAL,
});
logger.debug(
`Rule "${this.rule.id}" skipped scheduling action "${r.id}" because the maximum number of queued actions has been reached.`
);
delete logActions[r.id];
}
}
}
const logActionsValues = Object.values(logActions);
if (!!logActionsValues.length) {
for (const action of logActionsValues) {
alertingEventLogger.logAction(action);
}
}
return { throttledSummaryActions };
}
private async runSummarizedAction({
action,
summarizedAlerts,
spaceId,
bulkActions,
}: RunSummarizedActionArgs): Promise<LogAction> {
const { start, end } = getSummaryActionTimeBounds(
action,
this.rule.schedule,
this.previousStartedAt
);
const ruleUrl = this.buildRuleUrl(spaceId, start, end);
const actionToRun = {
...action,
params: injectActionParams({
actionTypeId: action.actionTypeId,
ruleUrl,
ruleName: this.rule.name,
actionParams: transformSummaryActionParams({
alerts: summarizedAlerts,
rule: this.rule,
ruleTypeId: this.ruleType.id,
actionId: action.id,
actionParams: action.params,
spaceId,
actionsPlugin: this.taskRunnerContext.actionsPlugin,
actionTypeId: action.actionTypeId,
kibanaBaseUrl: this.taskRunnerContext.kibanaBaseUrl,
ruleUrl: ruleUrl?.absoluteUrl,
}),
}),
};
await this.actionRunOrAddToBulk({
enqueueOptions: this.getEnqueueOptions(actionToRun),
bulkActions,
});
return {
id: action.id,
typeId: action.actionTypeId,
alertSummary: {
new: summarizedAlerts.new.count,
ongoing: summarizedAlerts.ongoing.count,
recovered: summarizedAlerts.recovered.count,
},
};
}
private async runSystemAction({
action,
spaceId,
connectorAdapter,
summarizedAlerts,
rule,
ruleProducer,
bulkActions,
}: RunSystemActionArgs<Params>): Promise<LogAction> {
const ruleUrl = this.buildRuleUrl(spaceId);
const connectorAdapterActionParams = connectorAdapter.buildActionParams({
alerts: summarizedAlerts,
rule: {
id: rule.id,
tags: rule.tags,
name: rule.name,
consumer: rule.consumer,
producer: ruleProducer,
},
ruleUrl: ruleUrl?.absoluteUrl,
spaceId,
params: action.params,
});
const actionToRun = Object.assign(action, { params: connectorAdapterActionParams });
await this.actionRunOrAddToBulk({
enqueueOptions: this.getEnqueueOptions(actionToRun),
bulkActions,
});
return {
id: action.id,
typeId: action.actionTypeId,
alertSummary: {
new: summarizedAlerts.new.count,
ongoing: summarizedAlerts.ongoing.count,
recovered: summarizedAlerts.recovered.count,
},
};
}
private async runAction({
action,
spaceId,
alert,
ruleId,
bulkActions,
}: RunActionArgs<State, Context, ActionGroupIds, RecoveryActionGroupId>): Promise<LogAction> {
const ruleUrl = this.buildRuleUrl(spaceId);
const executableAlert = alert!;
const actionGroup = action.group as ActionGroupIds;
const transformActionParamsOptions: TransformActionParamsOptions = {
actionsPlugin: this.taskRunnerContext.actionsPlugin,
alertId: ruleId,
alertType: this.ruleType.id,
actionTypeId: action.actionTypeId,
alertName: this.rule.name,
spaceId,
tags: this.rule.tags,
alertInstanceId: executableAlert.getId(),
alertUuid: executableAlert.getUuid(),
alertActionGroup: actionGroup,
alertActionGroupName: this.ruleTypeActionGroups!.get(actionGroup)!,
context: executableAlert.getContext(),
actionId: action.id,
state: executableAlert.getState(),
kibanaBaseUrl: this.taskRunnerContext.kibanaBaseUrl,
alertParams: this.rule.params,
actionParams: action.params,
flapping: executableAlert.getFlapping(),
ruleUrl: ruleUrl?.absoluteUrl,
};
if (executableAlert.isAlertAsData()) {
transformActionParamsOptions.aadAlert = executableAlert.getAlertAsData();
}
const actionToRun = {
...action,
params: injectActionParams({
actionTypeId: action.actionTypeId,
ruleUrl,
ruleName: this.rule.name,
actionParams: transformActionParams(transformActionParamsOptions),
}),
};
await this.actionRunOrAddToBulk({
enqueueOptions: this.getEnqueueOptions(actionToRun),
bulkActions,
});
return {
id: action.id,
typeId: action.actionTypeId,
alertId: alert.getId(),
alertGroup: action.group,
};
}
private logNumberOfFilteredAlerts({
numberOfAlerts = 0,
numberOfSummarizedAlerts = 0,
action,
}: {
numberOfAlerts: number;
numberOfSummarizedAlerts: number;
action: RuleAction | RuleSystemAction;
}) {
const count = numberOfAlerts - numberOfSummarizedAlerts;
if (count > 0) {
this.logger.debug(
`(${count}) alert${count > 1 ? 's' : ''} ${
count > 1 ? 'have' : 'has'
} been filtered out for: ${action.actionTypeId}:${action.uuid}`
);
}
}
private isAlertMuted(alertId: string) {
const muted = this.mutedAlertIdsSet.has(alertId);
if (muted) {
if (
!this.skippedAlerts[alertId] ||
(this.skippedAlerts[alertId] && this.skippedAlerts[alertId].reason !== Reasons.MUTED)
) {
this.logger.debug(
`skipping scheduling of actions for '${alertId}' in rule ${this.ruleLabel}: rule is muted`
);
}
this.skippedAlerts[alertId] = { reason: Reasons.MUTED };
return true;
}
return false;
}
private isExecutableAction(action: RuleAction | RuleSystemAction) {
return this.taskRunnerContext.actionsPlugin.isActionExecutable(action.id, action.actionTypeId, {
notifyUsage: true,
});
}
private isSystemAction(action?: RuleAction | RuleSystemAction): action is RuleSystemAction {
return this.taskRunnerContext.actionsPlugin.isSystemActionConnector(action?.id ?? '');
}
private isRecoveredAlert(actionGroup: string) {
return actionGroup === this.ruleType.recoveryActionGroup.id;
}
private isExecutableActiveAlert({
alert,
action,
}: {
alert: Alert<AlertInstanceState, AlertInstanceContext, ActionGroupIds | RecoveryActionGroupId>;
action: RuleAction;
}) {
const alertId = alert.getId();
const { rule, ruleLabel, logger } = this;
const notifyWhen = action.frequency?.notifyWhen || rule.notifyWhen;
if (notifyWhen === 'onActionGroupChange' && !alert.scheduledActionGroupHasChanged()) {
if (
!this.skippedAlerts[alertId] ||
(this.skippedAlerts[alertId] &&
this.skippedAlerts[alertId].reason !== Reasons.ACTION_GROUP_NOT_CHANGED)
) {
logger.debug(
`skipping scheduling of actions for '${alertId}' in rule ${ruleLabel}: alert is active but action group has not changed`
);
}
this.skippedAlerts[alertId] = { reason: Reasons.ACTION_GROUP_NOT_CHANGED };
return false;
}
if (notifyWhen === 'onThrottleInterval') {
const throttled = action.frequency?.throttle
? alert.isThrottled({
throttle: action.frequency.throttle ?? null,
actionHash: generateActionHash(action), // generateActionHash must be removed once all the hash identifiers removed from the task state
uuid: action.uuid,
})
: alert.isThrottled({ throttle: rule.throttle ?? null });
if (throttled) {
if (
!this.skippedAlerts[alertId] ||
(this.skippedAlerts[alertId] && this.skippedAlerts[alertId].reason !== Reasons.THROTTLED)
) {
logger.debug(
`skipping scheduling of actions for '${alertId}' in rule ${ruleLabel}: rule is throttled`
);
}
this.skippedAlerts[alertId] = { reason: Reasons.THROTTLED };
return false;
}
}
return alert.hasScheduledActions();
}
private getActionGroup(alert: Alert<State, Context, ActionGroupIds | RecoveryActionGroupId>) {
return alert.getScheduledActionOptions()?.actionGroup || this.ruleType.recoveryActionGroup.id;
}
private buildRuleUrl(spaceId: string, start?: number, end?: number): RuleUrl | undefined {
if (!this.taskRunnerContext.kibanaBaseUrl) {
return;
}
const relativePath = this.ruleType.getViewInAppRelativeUrl
? this.ruleType.getViewInAppRelativeUrl({ rule: this.rule, start, end })
: `${triggersActionsRoute}${getRuleDetailsRoute(this.rule.id)}`;
try {
const basePathname = new URL(this.taskRunnerContext.kibanaBaseUrl).pathname;
const basePathnamePrefix = basePathname !== '/' ? `${basePathname}` : '';
const spaceIdSegment = spaceId !== 'default' ? `/s/${spaceId}` : '';
const ruleUrl = new URL(
[basePathnamePrefix, spaceIdSegment, relativePath].join(''),
this.taskRunnerContext.kibanaBaseUrl
);
return {
absoluteUrl: ruleUrl.toString(),
kibanaBaseUrl: this.taskRunnerContext.kibanaBaseUrl,
basePathname: basePathnamePrefix,
spaceIdSegment,
relativePath,
};
} catch (error) {
this.logger.debug(
`Rule "${this.rule.id}" encountered an error while constructing the rule.url variable: ${error.message}`
);
return;
}
}
private getEnqueueOptions(action: RuleAction | RuleSystemAction): EnqueueExecutionOptions {
const {
apiKey,
ruleConsumer,
executionId,
taskInstance: {
params: { spaceId, alertId: ruleId },
},
} = this;
const namespace = spaceId === 'default' ? {} : { namespace: spaceId };
return {
id: action.id,
params: action.params,
spaceId,
apiKey: apiKey ?? null,
consumer: ruleConsumer,
source: asSavedObjectExecutionSource({
id: ruleId,
type: RULE_SAVED_OBJECT_TYPE,
}),
executionId,
relatedSavedObjects: [
{
id: ruleId,
type: RULE_SAVED_OBJECT_TYPE,
namespace: namespace.namespace,
typeId: this.ruleType.id,
},
],
actionTypeId: action.actionTypeId,
};
}
private async generateExecutables(
alerts: Record<string, Alert<State, Context, ActionGroupIds | RecoveryActionGroupId>>,
throttledSummaryActions: ThrottledActions
): Promise<Array<Executable<State, Context, ActionGroupIds, RecoveryActionGroupId>>> {
const executables = [];
for (const action of this.rule.actions) {
const alertsArray = Object.entries(alerts);
let summarizedAlerts = null;
if (this.shouldGetSummarizedAlerts({ action, throttledSummaryActions })) {
summarizedAlerts = await this.getSummarizedAlerts({
action,
spaceId: this.taskInstance.params.spaceId,
ruleId: this.taskInstance.params.alertId,
});
if (!isSummaryActionOnInterval(action)) {
this.logNumberOfFilteredAlerts({
numberOfAlerts: alertsArray.length,
numberOfSummarizedAlerts: summarizedAlerts.all.count,
action,
});
}
}
if (isSummaryAction(action)) {
if (summarizedAlerts && summarizedAlerts.all.count !== 0) {
executables.push({ action, summarizedAlerts });
}
continue;
}
for (const [alertId, alert] of alertsArray) {
const alertMaintenanceWindowIds = alert.getMaintenanceWindowIds();
if (alertMaintenanceWindowIds.length !== 0) {
this.logger.debug(
`no scheduling of summary actions "${action.id}" for rule "${
this.taskInstance.params.alertId
}": has active maintenance windows ${alertMaintenanceWindowIds.join(', ')}.`
);
continue;
}
if (alert.isFilteredOut(summarizedAlerts)) {
continue;
}
const actionGroup = this.getActionGroup(alert);
if (!this.ruleTypeActionGroups!.has(actionGroup)) {
this.logger.error(
`Invalid action group "${actionGroup}" for rule "${this.ruleType.id}".`
);
continue;
}
// only actions with notifyWhen set to "on status change" should return
// notifications for flapping pending recovered alerts
if (
alert.getPendingRecoveredCount() > 0 &&
action?.frequency?.notifyWhen !== RuleNotifyWhen.CHANGE
) {
continue;
}
if (summarizedAlerts) {
const alertAsData = summarizedAlerts.all.data.find(
(alertHit: AlertHit) => alertHit._id === alert.getUuid()
);
if (alertAsData) {
alert.setAlertAsData(alertAsData);
}
}
if (action.group === actionGroup && !this.isAlertMuted(alertId)) {
if (
this.isRecoveredAlert(action.group) ||
this.isExecutableActiveAlert({ alert, action })
) {
executables.push({ action, alert });
}
}
}
}
if (!this.canGetSummarizedAlerts()) {
return executables;
}
for (const systemAction of this.rule?.systemActions ?? []) {
const summarizedAlerts = await this.getSummarizedAlerts({
action: systemAction,
spaceId: this.taskInstance.params.spaceId,
ruleId: this.taskInstance.params.alertId,
});
if (summarizedAlerts && summarizedAlerts.all.count !== 0) {
executables.push({ action: systemAction, summarizedAlerts });
}
}
return executables;
}
private canGetSummarizedAlerts() {
return !!this.ruleType.alerts && !!this.alertsClient.getSummarizedAlerts;
}
private shouldGetSummarizedAlerts({
action,
throttledSummaryActions,
}: {
action: RuleAction;
throttledSummaryActions: ThrottledActions;
}) {
if (!this.canGetSummarizedAlerts()) {
if (action.frequency?.summary) {
this.logger.error(
`Skipping action "${action.id}" for rule "${this.rule.id}" because the rule type "${this.ruleType.name}" does not support alert-as-data.`
);
}
return false;
}
if (action.useAlertDataForTemplate) {
return true;
}
// we fetch summarizedAlerts to filter alerts in memory as well
if (!isSummaryAction(action) && !action.alertsFilter) {
return false;
}
if (
isSummaryAction(action) &&
isSummaryActionThrottled({
action,
throttledSummaryActions,
logger: this.logger,
})
) {
return false;
}
return true;
}
private async getSummarizedAlerts({
action,
ruleId,
spaceId,
}: {
action: RuleAction | RuleSystemAction;
ruleId: string;
spaceId: string;
}): Promise<CombinedSummarizedAlerts> {
const optionsBase = {
ruleId,
spaceId,
excludedAlertInstanceIds: this.rule.mutedInstanceIds,
alertsFilter: this.isSystemAction(action) ? undefined : (action as RuleAction).alertsFilter,
};
let options: GetSummarizedAlertsParams;
if (!this.isSystemAction(action) && isActionOnInterval(action)) {
const throttleMills = parseDuration((action as RuleAction).frequency!.throttle!);
const start = new Date(Date.now() - throttleMills);
options = {
...optionsBase,
start,
end: new Date(),
};
} else {
options = {
...optionsBase,
executionUuid: this.executionId,
};
}
let alerts;
try {
alerts = await withAlertingSpan(`alerting:get-summarized-alerts-${action.uuid}`, () =>
this.alertsClient.getSummarizedAlerts!(options)
);
} catch (e) {
throw createTaskRunError(e, TaskErrorSource.FRAMEWORK);
}
/**
* We need to remove all new alerts with maintenance windows retrieved from
* getSummarizedAlerts because they might not have maintenance window IDs
* associated with them from maintenance windows with scoped query updated
* yet (the update call uses refresh: false). So we need to rely on the in
* memory alerts to do this.
*/
const newAlertsInMemory =
Object.values(this.alertsClient.getProcessedAlerts('new') || {}) || [];
const newAlertsWithMaintenanceWindowIds = newAlertsInMemory.reduce<string[]>(
(result, alert) => {
if (alert.getMaintenanceWindowIds().length > 0) {
result.push(alert.getUuid());
}
return result;
},
[]
);
const newAlerts = alerts.new.data.filter((alert) => {
return !newAlertsWithMaintenanceWindowIds.includes(alert[ALERT_UUID]);
});
const total = newAlerts.length + alerts.ongoing.count + alerts.recovered.count;
return {
...alerts,
new: {
count: newAlerts.length,
data: newAlerts,
},
all: {
count: total,
data: [...newAlerts, ...alerts.ongoing.data, ...alerts.recovered.data],
},
};
}
private async actionRunOrAddToBulk({
enqueueOptions,
bulkActions,
}: {
enqueueOptions: EnqueueExecutionOptions;
bulkActions: EnqueueExecutionOptions[];
}) {
if (this.taskRunnerContext.supportsEphemeralTasks && this.ephemeralActionsToSchedule > 0) {
this.ephemeralActionsToSchedule--;
try {
await this.actionsClient!.ephemeralEnqueuedExecution(enqueueOptions);
} catch (err) {
if (isEphemeralTaskRejectedDueToCapacityError(err)) {
bulkActions.push(enqueueOptions);
}
}
} else {
bulkActions.push(enqueueOptions);
}
}
}

View file

@ -7,7 +7,7 @@
import { i18n } from '@kbn/i18n';
import { RuleActionParams } from '../types';
import { RuleUrl } from './execution_handler';
import { RuleUrl } from './action_scheduler';
export interface InjectActionParamsOpts {
actionTypeId: string;

View file

@ -18,7 +18,7 @@ import {
} from '@kbn/task-manager-plugin/server';
import { nanosToMillis } from '@kbn/event-log-plugin/server';
import { getErrorSource, isUserError } from '@kbn/task-manager-plugin/server/task_running';
import { ExecutionHandler, RunResult } from './execution_handler';
import { ActionScheduler, type RunResult } from './action_scheduler';
import {
RuleRunnerErrorStackTraceLog,
RuleTaskInstance,
@ -381,7 +381,7 @@ export class TaskRunner<
throw error;
}
const executionHandler = new ExecutionHandler({
const actionScheduler = new ActionScheduler({
rule,
ruleType: this.ruleType,
logger: this.logger,
@ -398,7 +398,7 @@ export class TaskRunner<
alertsClient,
});
let executionHandlerRunResult: RunResult = { throttledSummaryActions: {} };
let actionSchedulerResult: RunResult = { throttledSummaryActions: {} };
await withAlertingSpan('alerting:schedule-actions', () =>
this.timer.runWithTimer(TaskRunnerTimerSpan.TriggerActions, async () => {
@ -410,7 +410,7 @@ export class TaskRunner<
);
this.countUsageOfActionExecutionAfterRuleCancellation();
} else {
executionHandlerRunResult = await executionHandler.run({
actionSchedulerResult = await actionScheduler.run({
...alertsClient.getProcessedAlerts('activeCurrent'),
...alertsClient.getProcessedAlerts('recoveredCurrent'),
});
@ -435,7 +435,7 @@ export class TaskRunner<
alertTypeState: updatedRuleTypeState || undefined,
alertInstances: alertsToReturn,
alertRecoveredInstances: recoveredAlertsToReturn,
summaryActions: executionHandlerRunResult.throttledSummaryActions,
summaryActions: actionSchedulerResult.throttledSummaryActions,
};
}

View file

@ -83,9 +83,8 @@ export interface RuleTaskInstance extends ConcreteTaskInstance {
state: RuleTaskState;
}
// / ExecutionHandler
export interface ExecutionHandlerOptions<
// ActionScheduler
export interface ActionSchedulerOptions<
Params extends RuleTypeParams,
ExtractedParams extends RuleTypeParams,
RuleState extends RuleTypeState,

View file

@ -70,7 +70,8 @@
"@kbn/react-kibana-context-render",
"@kbn/search-types",
"@kbn/alerting-state-types",
"@kbn/core-security-server"
"@kbn/core-security-server",
"@kbn/core-http-server"
],
"exclude": [
"target/**/*"