[Response Ops] [Alerting] Adds circuit breaker that limits the max number of alerts that can be reported each rule run. (#138446)

* Adding config for max alerts. Throw error in alert factory when max alerts are reported. Process alerts differently when alert limit is reached

* Fixing types. Adding new config to docker and docs

* Setting flag to indicate limit reached. Update rule status to warning if limit reached

* Fixing task runner test

* Cleanup

* Fixing task runner test

* Actually using result of hasReachedAlertLimit

* Fixing types

* Copying over scheduled actions for active alerts. Only execute actions if they exist

* Setting lower limit in functional test config

* Adding functional test

* Update x-pack/plugins/alerting/server/constants/translations.ts

Co-authored-by: Lisa Cawley <lcawley@elastic.co>

* PR feedback

Co-authored-by: Lisa Cawley <lcawley@elastic.co>
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2022-08-16 10:36:38 -04:00 committed by GitHub
parent 9631649e72
commit db72db9e7c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
39 changed files with 985 additions and 212 deletions

View file

@ -226,6 +226,9 @@ Specifies the behavior when a new or changed rule has a schedule interval less t
`xpack.alerting.rules.run.actions.max` {ess-icon}::
Specifies the maximum number of actions that a rule can generate each time detection checks run.
`xpack.alerting.rules.run.alerts.max` {ess-icon}::
Specifies the maximum number of alerts that a rule can generate each time detection checks run. Default: 1000.
`xpack.alerting.rules.run.timeout` {ess-icon}::
Specifies the default timeout for tasks associated with all types of rules. The time is formatted as:
+

View file

@ -221,6 +221,7 @@ kibana_vars=(
xpack.alerting.rules.minimumScheduleInterval.value
xpack.alerting.rules.minimumScheduleInterval.enforce
xpack.alerting.rules.run.actions.max
xpack.alerting.rules.run.alerts.max
xpack.alerting.rules.run.actions.connectorTypeOverrides
xpack.alerts.healthCheck.interval
xpack.alerts.invalidateApiKeysTask.interval

View file

@ -45,6 +45,7 @@ export enum RuleExecutionStatusErrorReasons {
export enum RuleExecutionStatusWarningReasons {
MAX_EXECUTABLE_ACTIONS = 'maxExecutableActions',
MAX_ALERTS = 'maxAlerts',
}
export interface RuleExecutionStatus {

View file

@ -29,6 +29,7 @@ describe('createAlertFactory()', () => {
const alertFactory = createAlertFactory({
alerts: {},
logger,
maxAlerts: 1000,
});
const result = alertFactory.create('1');
expect(result).toMatchInlineSnapshot(`
@ -51,6 +52,7 @@ describe('createAlertFactory()', () => {
'1': alert,
},
logger,
maxAlerts: 1000,
});
const result = alertFactory.create('1');
expect(result).toMatchInlineSnapshot(`
@ -73,6 +75,7 @@ describe('createAlertFactory()', () => {
const alertFactory = createAlertFactory({
alerts,
logger,
maxAlerts: 1000,
});
alertFactory.create('1');
expect(alerts).toMatchInlineSnapshot(`
@ -85,10 +88,30 @@ describe('createAlertFactory()', () => {
`);
});
test('throws error and sets flag when more alerts are created than allowed', () => {
const alertFactory = createAlertFactory({
alerts: {},
logger,
maxAlerts: 3,
});
expect(alertFactory.hasReachedAlertLimit()).toBe(false);
alertFactory.create('1');
alertFactory.create('2');
alertFactory.create('3');
expect(() => {
alertFactory.create('4');
}).toThrowErrorMatchingInlineSnapshot(`"Rule reported more than 3 alerts."`);
expect(alertFactory.hasReachedAlertLimit()).toBe(true);
});
test('throws error when creating alerts after done() is called', () => {
const alertFactory = createAlertFactory({
alerts: {},
logger,
maxAlerts: 1000,
});
const result = alertFactory.create('1');
expect(result).toEqual({
@ -127,6 +150,7 @@ describe('createAlertFactory()', () => {
alerts: {},
logger,
canSetRecoveryContext: true,
maxAlerts: 1000,
});
const result = alertFactory.create('1');
expect(result).toEqual({
@ -149,6 +173,7 @@ describe('createAlertFactory()', () => {
const alertFactory = createAlertFactory({
alerts: {},
logger,
maxAlerts: 1000,
canSetRecoveryContext: true,
});
const result = alertFactory.create('1');
@ -171,6 +196,7 @@ describe('createAlertFactory()', () => {
const alertFactory = createAlertFactory({
alerts: {},
logger,
maxAlerts: 1000,
canSetRecoveryContext: true,
});
const result = alertFactory.create('1');
@ -192,6 +218,7 @@ describe('createAlertFactory()', () => {
const alertFactory = createAlertFactory({
alerts: {},
logger,
maxAlerts: 1000,
canSetRecoveryContext: false,
});
const result = alertFactory.create('1');

View file

@ -25,6 +25,7 @@ export interface CreateAlertFactoryOpts<
> {
alerts: Record<string, Alert<State, Context>>;
logger: Logger;
maxAlerts: number;
canSetRecoveryContext?: boolean;
}
@ -32,21 +33,40 @@ export function createAlertFactory<
State extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string
>({ alerts, logger, canSetRecoveryContext = false }: CreateAlertFactoryOpts<State, Context>) {
>({
alerts,
logger,
maxAlerts,
canSetRecoveryContext = false,
}: CreateAlertFactoryOpts<State, Context>) {
// Keep track of which alerts we started with so we can determine which have recovered
const originalAlerts = cloneDeep(alerts);
// Number of alerts reported
let numAlertsCreated = 0;
// Whether the number of alerts reported has reached max allowed
let hasReachedAlertLimit = false;
let isDone = false;
return {
create: (id: string): PublicAlert<State, Context, ActionGroupIds> => {
if (isDone) {
throw new Error(`Can't create new alerts after calling done() in AlertsFactory.`);
}
if (numAlertsCreated++ >= maxAlerts) {
hasReachedAlertLimit = true;
throw new Error(`Rule reported more than ${maxAlerts} alerts.`);
}
if (!alerts[id]) {
alerts[id] = new Alert<State, Context>(id);
}
return alerts[id];
},
hasReachedAlertLimit: (): boolean => hasReachedAlertLimit,
done: (): AlertFactoryDoneUtils<State, Context, ActionGroupIds> => {
isDone = true;
return {
@ -59,8 +79,12 @@ export function createAlertFactory<
}
const { recoveredAlerts } = processAlerts<State, Context, ActionGroupIds, ActionGroupIds>(
alerts,
originalAlerts
{
alerts,
existingAlerts: originalAlerts,
hasReachedAlertLimit,
alertLimit: maxAlerts,
}
);
return Object.keys(recoveredAlerts ?? {}).map(
(alertId: string) => recoveredAlerts[alertId]

View file

@ -30,6 +30,9 @@ describe('config validation', () => {
"actions": Object {
"max": 100000,
},
"alerts": Object {
"max": 1000,
},
},
},
}

View file

@ -43,6 +43,9 @@ const rulesSchema = schema.object({
max: schema.number({ defaultValue: 100000, max: 100000 }),
connectorTypeOverrides: schema.maybe(schema.arrayOf(connectorTypeSchema)),
}),
alerts: schema.object({
max: schema.number({ defaultValue: 1000 }),
}),
ruleTypeOverrides: schema.maybe(schema.arrayOf(ruleTypeSchema)),
}),
});

View file

@ -17,6 +17,10 @@ export const translations = {
'The maximum number of actions for this rule type was reached; excess actions were not triggered.',
}
),
maxAlerts: i18n.translate('xpack.alerting.taskRunner.warning.maxAlerts', {
defaultMessage:
'Rule reported more than the maximum number of alerts in a single run. Alerts may be missed and recovery notifications may be delayed',
}),
},
},
};

View file

@ -653,6 +653,7 @@ describe('AlertingEventLogger', () => {
numSearches: 6,
esSearchDurationMs: 3300,
totalSearchDurationMs: 10333,
hasReachedAlertLimit: false,
triggeredActionsStatus: ActionsCompletion.COMPLETE,
},
});

View file

@ -37,6 +37,7 @@ const configWithoutTimeout = {
...config,
run: {
actions: { max: 1000 },
alerts: { max: 1000 },
},
};

View file

@ -30,13 +30,13 @@ describe('processAlerts', () => {
const existingAlert1 = new Alert<{}, {}, DefaultActionGroupId>('2');
const existingAlert2 = new Alert<{}, {}, DefaultActionGroupId>('3');
const originalAlerts = {
const existingAlerts = {
'2': existingAlert1,
'3': existingAlert2,
};
const updatedAlerts = {
...cloneDeep(originalAlerts),
...cloneDeep(existingAlerts),
'1': newAlert,
};
@ -44,8 +44,14 @@ describe('processAlerts', () => {
updatedAlerts['2'].scheduleActions('default', { foo: '1' });
updatedAlerts['3'].scheduleActions('default', { foo: '2' });
// @ts-expect-error
const { newAlerts } = processAlerts(updatedAlerts, originalAlerts);
const { newAlerts } = processAlerts({
// @ts-expect-error
alerts: updatedAlerts,
// @ts-expect-error
existingAlerts,
hasReachedAlertLimit: false,
alertLimit: 10,
});
expect(newAlerts).toEqual({ '1': newAlert });
});
@ -56,13 +62,13 @@ describe('processAlerts', () => {
const existingAlert1 = new Alert<{}, {}, DefaultActionGroupId>('3');
const existingAlert2 = new Alert<{}, {}, DefaultActionGroupId>('4');
const originalAlerts = {
const existingAlerts = {
'3': existingAlert1,
'4': existingAlert2,
};
const updatedAlerts = {
...cloneDeep(originalAlerts),
...cloneDeep(existingAlerts),
'1': newAlert1,
'2': newAlert2,
};
@ -75,8 +81,14 @@ describe('processAlerts', () => {
expect(newAlert1.getState()).toStrictEqual({});
expect(newAlert2.getState()).toStrictEqual({});
// @ts-expect-error
const { newAlerts } = processAlerts(updatedAlerts, originalAlerts);
const { newAlerts } = processAlerts({
// @ts-expect-error
alerts: updatedAlerts,
// @ts-expect-error
existingAlerts,
hasReachedAlertLimit: false,
alertLimit: 10,
});
expect(newAlerts).toEqual({ '1': newAlert1, '2': newAlert2 });
@ -106,13 +118,13 @@ describe('processAlerts', () => {
const existingAlert1 = new Alert<{}, {}, DefaultActionGroupId>('2');
const existingAlert2 = new Alert<{}, {}, DefaultActionGroupId>('3');
const originalAlerts = {
const existingAlerts = {
'2': existingAlert1,
'3': existingAlert2,
};
const updatedAlerts = {
...cloneDeep(originalAlerts),
...cloneDeep(existingAlerts),
'1': newAlert,
};
@ -120,8 +132,14 @@ describe('processAlerts', () => {
updatedAlerts['2'].scheduleActions('default', { foo: '1' });
updatedAlerts['3'].scheduleActions('default', { foo: '2' });
// @ts-expect-error
const { activeAlerts } = processAlerts(updatedAlerts, originalAlerts);
const { activeAlerts } = processAlerts({
// @ts-expect-error
alerts: updatedAlerts,
// @ts-expect-error
existingAlerts,
hasReachedAlertLimit: false,
alertLimit: 10,
});
expect(activeAlerts).toEqual({
'1': updatedAlerts['1'],
@ -135,15 +153,15 @@ describe('processAlerts', () => {
const existingAlert1 = new Alert<{}, {}, DefaultActionGroupId>('2');
const existingAlert2 = new Alert<{}, {}, DefaultActionGroupId>('3');
const originalAlerts = {
const existingAlerts = {
'2': existingAlert1,
'3': existingAlert2,
};
originalAlerts['2'].replaceState({ start: '1969-12-30T00:00:00.000Z', duration: 33000 });
originalAlerts['3'].replaceState({ start: '1969-12-31T07:34:00.000Z', duration: 23532 });
existingAlerts['2'].replaceState({ start: '1969-12-30T00:00:00.000Z', duration: 33000 });
existingAlerts['3'].replaceState({ start: '1969-12-31T07:34:00.000Z', duration: 23532 });
const updatedAlerts = {
...cloneDeep(originalAlerts),
...cloneDeep(existingAlerts),
'1': newAlert,
};
@ -151,8 +169,14 @@ describe('processAlerts', () => {
updatedAlerts['2'].scheduleActions('default', { foo: '1' });
updatedAlerts['3'].scheduleActions('default', { foo: '2' });
// @ts-expect-error
const { activeAlerts } = processAlerts(updatedAlerts, originalAlerts);
const { activeAlerts } = processAlerts({
// @ts-expect-error
alerts: updatedAlerts,
// @ts-expect-error
existingAlerts,
hasReachedAlertLimit: false,
alertLimit: 10,
});
expect(activeAlerts).toEqual({
'1': updatedAlerts['1'],
@ -184,13 +208,13 @@ describe('processAlerts', () => {
const existingAlert1 = new Alert<{}, {}, DefaultActionGroupId>('2');
const existingAlert2 = new Alert<{}, {}, DefaultActionGroupId>('3');
const originalAlerts = {
const existingAlerts = {
'2': existingAlert1,
'3': existingAlert2,
};
const updatedAlerts = {
...cloneDeep(originalAlerts),
...cloneDeep(existingAlerts),
'1': newAlert,
};
@ -198,8 +222,14 @@ describe('processAlerts', () => {
updatedAlerts['2'].scheduleActions('default', { foo: '1' });
updatedAlerts['3'].scheduleActions('default', { foo: '2' });
// @ts-expect-error
const { activeAlerts } = processAlerts(updatedAlerts, originalAlerts);
const { activeAlerts } = processAlerts({
// @ts-expect-error
alerts: updatedAlerts,
// @ts-expect-error
existingAlerts,
hasReachedAlertLimit: false,
alertLimit: 10,
});
expect(activeAlerts).toEqual({
'1': updatedAlerts['1'],
@ -231,23 +261,23 @@ describe('processAlerts', () => {
const existingAlert1 = new Alert<{}, {}, DefaultActionGroupId>('2');
const existingAlert2 = new Alert<{}, {}, DefaultActionGroupId>('3');
const originalAlerts = {
const existingAlerts = {
'2': existingAlert1,
'3': existingAlert2,
};
originalAlerts['2'].replaceState({
existingAlerts['2'].replaceState({
stateField1: 'xyz',
start: '1969-12-30T00:00:00.000Z',
duration: 33000,
});
originalAlerts['3'].replaceState({
existingAlerts['3'].replaceState({
anotherState: true,
start: '1969-12-31T07:34:00.000Z',
duration: 23532,
});
const updatedAlerts = {
...cloneDeep(originalAlerts),
...cloneDeep(existingAlerts),
'1': newAlert,
};
@ -255,8 +285,14 @@ describe('processAlerts', () => {
updatedAlerts['2'].scheduleActions('default', { foo: '1' });
updatedAlerts['3'].scheduleActions('default', { foo: '2' });
// @ts-expect-error
const { activeAlerts } = processAlerts(updatedAlerts, originalAlerts);
const { activeAlerts } = processAlerts({
// @ts-expect-error
alerts: updatedAlerts,
// @ts-expect-error
existingAlerts,
hasReachedAlertLimit: false,
alertLimit: 10,
});
expect(activeAlerts).toEqual({
'1': updatedAlerts['1'],
@ -294,18 +330,24 @@ describe('processAlerts', () => {
const activeAlert = new Alert<{}, {}, DefaultActionGroupId>('1');
const recoveredAlert = new Alert<{}, {}, DefaultActionGroupId>('2');
const originalAlerts = {
const existingAlerts = {
'1': activeAlert,
'2': recoveredAlert,
};
const updatedAlerts = cloneDeep(originalAlerts);
const updatedAlerts = cloneDeep(existingAlerts);
updatedAlerts['1'].scheduleActions('default', { foo: '1' });
updatedAlerts['2'].setContext({ foo: '2' });
// @ts-expect-error
const { recoveredAlerts } = processAlerts(updatedAlerts, originalAlerts);
const { recoveredAlerts } = processAlerts({
// @ts-expect-error
alerts: updatedAlerts,
// @ts-expect-error
existingAlerts,
hasReachedAlertLimit: false,
alertLimit: 10,
});
expect(recoveredAlerts).toEqual({ '2': updatedAlerts['2'] });
});
@ -314,19 +356,25 @@ describe('processAlerts', () => {
const activeAlert = new Alert<{}, {}, DefaultActionGroupId>('1');
const notRecoveredAlert = new Alert<{}, {}, DefaultActionGroupId>('2');
const originalAlerts = {
const existingAlerts = {
'1': activeAlert,
};
const updatedAlerts = {
...cloneDeep(originalAlerts),
...cloneDeep(existingAlerts),
'2': notRecoveredAlert,
};
updatedAlerts['1'].scheduleActions('default', { foo: '1' });
// @ts-expect-error
const { recoveredAlerts } = processAlerts(updatedAlerts, originalAlerts);
const { recoveredAlerts } = processAlerts({
// @ts-expect-error
alerts: updatedAlerts,
// @ts-expect-error
existingAlerts,
hasReachedAlertLimit: false,
alertLimit: 10,
});
expect(recoveredAlerts).toEqual({});
});
@ -336,20 +384,26 @@ describe('processAlerts', () => {
const recoveredAlert1 = new Alert<{}, {}, DefaultActionGroupId>('2');
const recoveredAlert2 = new Alert<{}, {}, DefaultActionGroupId>('3');
const originalAlerts = {
const existingAlerts = {
'1': activeAlert,
'2': recoveredAlert1,
'3': recoveredAlert2,
};
originalAlerts['2'].replaceState({ start: '1969-12-30T00:00:00.000Z', duration: 33000 });
originalAlerts['3'].replaceState({ start: '1969-12-31T07:34:00.000Z', duration: 23532 });
existingAlerts['2'].replaceState({ start: '1969-12-30T00:00:00.000Z', duration: 33000 });
existingAlerts['3'].replaceState({ start: '1969-12-31T07:34:00.000Z', duration: 23532 });
const updatedAlerts = cloneDeep(originalAlerts);
const updatedAlerts = cloneDeep(existingAlerts);
updatedAlerts['1'].scheduleActions('default', { foo: '1' });
// @ts-expect-error
const { recoveredAlerts } = processAlerts(updatedAlerts, originalAlerts);
const { recoveredAlerts } = processAlerts({
// @ts-expect-error
alerts: updatedAlerts,
// @ts-expect-error
existingAlerts,
hasReachedAlertLimit: false,
alertLimit: 10,
});
expect(recoveredAlerts).toEqual({ '2': updatedAlerts['2'], '3': updatedAlerts['3'] });
@ -377,17 +431,23 @@ describe('processAlerts', () => {
const recoveredAlert1 = new Alert<{}, {}, DefaultActionGroupId>('2');
const recoveredAlert2 = new Alert<{}, {}, DefaultActionGroupId>('3');
const originalAlerts = {
const existingAlerts = {
'1': activeAlert,
'2': recoveredAlert1,
'3': recoveredAlert2,
};
const updatedAlerts = cloneDeep(originalAlerts);
const updatedAlerts = cloneDeep(existingAlerts);
updatedAlerts['1'].scheduleActions('default', { foo: '1' });
// @ts-expect-error
const { recoveredAlerts } = processAlerts(updatedAlerts, originalAlerts);
const { recoveredAlerts } = processAlerts({
// @ts-expect-error
alerts: updatedAlerts,
// @ts-expect-error
existingAlerts,
hasReachedAlertLimit: false,
alertLimit: 10,
});
expect(recoveredAlerts).toEqual({ '2': updatedAlerts['2'], '3': updatedAlerts['3'] });
@ -410,4 +470,156 @@ describe('processAlerts', () => {
expect(recoveredAlert2State.end).not.toBeDefined();
});
});
describe('when hasReachedAlertLimit is true', () => {
test('does not calculate recovered alerts', () => {
const existingAlert1 = new Alert<{}, {}, DefaultActionGroupId>('1');
const existingAlert2 = new Alert<{}, {}, DefaultActionGroupId>('2');
const existingAlert3 = new Alert<{}, {}, DefaultActionGroupId>('3');
const existingAlert4 = new Alert<{}, {}, DefaultActionGroupId>('4');
const existingAlert5 = new Alert<{}, {}, DefaultActionGroupId>('5');
const newAlert6 = new Alert<{}, {}, DefaultActionGroupId>('6');
const newAlert7 = new Alert<{}, {}, DefaultActionGroupId>('7');
const existingAlerts = {
'1': existingAlert1,
'2': existingAlert2,
'3': existingAlert3,
'4': existingAlert4,
'5': existingAlert5,
};
const updatedAlerts = {
...cloneDeep(existingAlerts),
'6': newAlert6,
'7': newAlert7,
};
updatedAlerts['1'].scheduleActions('default', { foo: '1' });
updatedAlerts['2'].scheduleActions('default', { foo: '1' });
updatedAlerts['3'].scheduleActions('default', { foo: '2' });
updatedAlerts['4'].scheduleActions('default', { foo: '2' });
// intentionally not scheduling actions for alert "5"
updatedAlerts['6'].scheduleActions('default', { foo: '2' });
updatedAlerts['7'].scheduleActions('default', { foo: '2' });
const { recoveredAlerts } = processAlerts({
// @ts-expect-error
alerts: updatedAlerts,
// @ts-expect-error
existingAlerts,
hasReachedAlertLimit: true,
alertLimit: 7,
});
expect(recoveredAlerts).toEqual({});
});
test('persists existing alerts', () => {
const existingAlert1 = new Alert<{}, {}, DefaultActionGroupId>('1');
const existingAlert2 = new Alert<{}, {}, DefaultActionGroupId>('2');
const existingAlert3 = new Alert<{}, {}, DefaultActionGroupId>('3');
const existingAlert4 = new Alert<{}, {}, DefaultActionGroupId>('4');
const existingAlert5 = new Alert<{}, {}, DefaultActionGroupId>('5');
const existingAlerts = {
'1': existingAlert1,
'2': existingAlert2,
'3': existingAlert3,
'4': existingAlert4,
'5': existingAlert5,
};
const updatedAlerts = cloneDeep(existingAlerts);
updatedAlerts['1'].scheduleActions('default', { foo: '1' });
updatedAlerts['2'].scheduleActions('default', { foo: '1' });
updatedAlerts['3'].scheduleActions('default', { foo: '2' });
updatedAlerts['4'].scheduleActions('default', { foo: '2' });
// intentionally not scheduling actions for alert "5"
const { activeAlerts } = processAlerts({
// @ts-expect-error
alerts: updatedAlerts,
// @ts-expect-error
existingAlerts,
hasReachedAlertLimit: true,
alertLimit: 7,
});
expect(activeAlerts).toEqual({
'1': updatedAlerts['1'],
'2': updatedAlerts['2'],
'3': updatedAlerts['3'],
'4': updatedAlerts['4'],
'5': existingAlert5,
});
});
test('adds new alerts up to max allowed', () => {
const MAX_ALERTS = 7;
const existingAlert1 = new Alert<{}, {}, DefaultActionGroupId>('1');
const existingAlert2 = new Alert<{}, {}, DefaultActionGroupId>('2');
const existingAlert3 = new Alert<{}, {}, DefaultActionGroupId>('3');
const existingAlert4 = new Alert<{}, {}, DefaultActionGroupId>('4');
const existingAlert5 = new Alert<{}, {}, DefaultActionGroupId>('5');
const newAlert6 = new Alert<{}, {}, DefaultActionGroupId>('6');
const newAlert7 = new Alert<{}, {}, DefaultActionGroupId>('7');
const newAlert8 = new Alert<{}, {}, DefaultActionGroupId>('8');
const newAlert9 = new Alert<{}, {}, DefaultActionGroupId>('9');
const newAlert10 = new Alert<{}, {}, DefaultActionGroupId>('10');
const existingAlerts = {
'1': existingAlert1,
'2': existingAlert2,
'3': existingAlert3,
'4': existingAlert4,
'5': existingAlert5,
};
const updatedAlerts = {
...cloneDeep(existingAlerts),
'6': newAlert6,
'7': newAlert7,
'8': newAlert8,
'9': newAlert9,
'10': newAlert10,
};
updatedAlerts['1'].scheduleActions('default', { foo: '1' });
updatedAlerts['2'].scheduleActions('default', { foo: '1' });
updatedAlerts['3'].scheduleActions('default', { foo: '2' });
updatedAlerts['4'].scheduleActions('default', { foo: '2' });
// intentionally not scheduling actions for alert "5"
updatedAlerts['6'].scheduleActions('default', { foo: '2' });
updatedAlerts['7'].scheduleActions('default', { foo: '2' });
updatedAlerts['8'].scheduleActions('default', { foo: '2' });
updatedAlerts['9'].scheduleActions('default', { foo: '2' });
updatedAlerts['10'].scheduleActions('default', { foo: '2' });
const { activeAlerts, newAlerts } = processAlerts({
// @ts-expect-error
alerts: updatedAlerts,
// @ts-expect-error
existingAlerts,
hasReachedAlertLimit: true,
alertLimit: MAX_ALERTS,
});
expect(Object.keys(activeAlerts).length).toEqual(MAX_ALERTS);
expect(activeAlerts).toEqual({
'1': updatedAlerts['1'],
'2': updatedAlerts['2'],
'3': updatedAlerts['3'],
'4': updatedAlerts['4'],
'5': existingAlert5,
'6': newAlert6,
'7': newAlert7,
});
expect(newAlerts).toEqual({
'6': newAlert6,
'7': newAlert7,
});
});
});
});

View file

@ -6,9 +6,19 @@
*/
import { millisToNanos } from '@kbn/event-log-plugin/server';
import { cloneDeep } from 'lodash';
import { Alert } from '../alert';
import { AlertInstanceState, AlertInstanceContext } from '../types';
interface ProcessAlertsOpts<
State extends AlertInstanceState,
Context extends AlertInstanceContext
> {
alerts: Record<string, Alert<State, Context>>;
existingAlerts: Record<string, Alert<State, Context>>;
hasReachedAlertLimit: boolean;
alertLimit: number;
}
interface ProcessAlertsResult<
State extends AlertInstanceState,
Context extends AlertInstanceContext,
@ -25,11 +35,32 @@ export function processAlerts<
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string
>({
alerts,
existingAlerts,
hasReachedAlertLimit,
alertLimit,
}: ProcessAlertsOpts<State, Context>): ProcessAlertsResult<
State,
Context,
ActionGroupIds,
RecoveryActionGroupId
> {
return hasReachedAlertLimit
? processAlertsLimitReached(alerts, existingAlerts, alertLimit)
: processAlertsHelper(alerts, existingAlerts);
}
function processAlertsHelper<
State extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string
>(
alerts: Record<string, Alert<State, Context>>,
originalAlerts: Record<string, Alert<State, Context>>
existingAlerts: Record<string, Alert<State, Context>>
): ProcessAlertsResult<State, Context, ActionGroupIds, RecoveryActionGroupId> {
const originalAlertIds = new Set(Object.keys(originalAlerts));
const existingAlertIds = new Set(Object.keys(existingAlerts));
const currentTime = new Date().toISOString();
const newAlerts: Record<string, Alert<State, Context, ActionGroupIds>> = {};
@ -43,7 +74,7 @@ export function processAlerts<
activeAlerts[id] = alerts[id];
// if this alert did not exist in previous run, it is considered "new"
if (!originalAlertIds.has(id)) {
if (!existingAlertIds.has(id)) {
newAlerts[id] = alerts[id];
// Inject start time into alert state for new alerts
@ -52,7 +83,7 @@ export function processAlerts<
} else {
// this alert did exist in previous run
// calculate duration to date for active alerts
const state = originalAlerts[id].getState();
const state = existingAlerts[id].getState();
const durationInMs =
new Date(currentTime).valueOf() - new Date(state.start as string).valueOf();
const duration = state.start ? millisToNanos(durationInMs) : undefined;
@ -62,7 +93,7 @@ export function processAlerts<
...(duration !== undefined ? { duration } : {}),
});
}
} else if (originalAlertIds.has(id)) {
} else if (existingAlertIds.has(id)) {
recoveredAlerts[id] = alerts[id];
// Inject end time into alert state of recovered alerts
@ -80,3 +111,76 @@ export function processAlerts<
}
return { recoveredAlerts, newAlerts, activeAlerts };
}
function processAlertsLimitReached<
State extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string
>(
alerts: Record<string, Alert<State, Context>>,
existingAlerts: Record<string, Alert<State, Context>>,
alertLimit: number
): ProcessAlertsResult<State, Context, ActionGroupIds, RecoveryActionGroupId> {
const existingAlertIds = new Set(Object.keys(existingAlerts));
// When the alert limit has been reached,
// - skip determination of recovered alerts
// - pass through all existing alerts as active
// - add any new alerts, up to the max allowed
const currentTime = new Date().toISOString();
const newAlerts: Record<string, Alert<State, Context, ActionGroupIds>> = {};
// all existing alerts stay active
const activeAlerts: Record<string, Alert<State, Context, ActionGroupIds>> = cloneDeep(
existingAlerts
);
// update duration for existing alerts
for (const id in activeAlerts) {
if (activeAlerts.hasOwnProperty(id)) {
if (alerts.hasOwnProperty(id)) {
activeAlerts[id] = alerts[id];
}
const state = existingAlerts[id].getState();
const durationInMs =
new Date(currentTime).valueOf() - new Date(state.start as string).valueOf();
const duration = state.start ? millisToNanos(durationInMs) : undefined;
activeAlerts[id].replaceState({
...state,
...(state.start ? { start: state.start } : {}),
...(duration !== undefined ? { duration } : {}),
});
}
}
function hasCapacityForNewAlerts() {
return Object.keys(activeAlerts).length < alertLimit;
}
// if we don't have capacity for new alerts, return
if (!hasCapacityForNewAlerts()) {
return { recoveredAlerts: {}, newAlerts: {}, activeAlerts };
}
// look for new alerts and add until we hit capacity
for (const id in alerts) {
if (alerts.hasOwnProperty(id) && alerts[id].hasScheduledActions()) {
// if this alert did not exist in previous run, it is considered "new"
if (!existingAlertIds.has(id)) {
activeAlerts[id] = alerts[id];
newAlerts[id] = alerts[id];
// Inject start time into alert state for new alerts
const state = newAlerts[id].getState();
newAlerts[id].replaceState({ ...state, start: currentTime, duration: '0' });
if (!hasCapacityForNewAlerts()) {
break;
}
}
}
}
return { recoveredAlerts: {}, newAlerts, activeAlerts };
}

View file

@ -31,6 +31,7 @@ const executionMetrics = {
numberOfActiveAlerts: 2,
numberOfNewAlerts: 3,
numberOfRecoveredAlerts: 13,
hasReachedAlertLimit: false,
triggeredActionsStatus: ActionsCompletion.COMPLETE,
};
@ -48,6 +49,7 @@ describe('RuleExecutionStatus', () => {
expect(received.numberOfActiveAlerts).toEqual(expected.numberOfActiveAlerts);
expect(received.numberOfRecoveredAlerts).toEqual(expected.numberOfRecoveredAlerts);
expect(received.numberOfNewAlerts).toEqual(expected.numberOfNewAlerts);
expect(received.hasReachedAlertLimit).toEqual(expected.hasReachedAlertLimit);
expect(received.triggeredActionsStatus).toEqual(expected.triggeredActionsStatus);
}
@ -89,7 +91,7 @@ describe('RuleExecutionStatus', () => {
testExpectedMetrics(metrics!, executionMetrics);
});
test('task state with warning', () => {
test('task state with max executable actions warning', () => {
const { status, metrics } = executionStatusFromState({
alertInstances: { a: {} },
metrics: { ...executionMetrics, triggeredActionsStatus: ActionsCompletion.PARTIAL },
@ -107,6 +109,25 @@ describe('RuleExecutionStatus', () => {
triggeredActionsStatus: ActionsCompletion.PARTIAL,
});
});
test('task state with max alerts warning', () => {
const { status, metrics } = executionStatusFromState({
alertInstances: { a: {} },
metrics: { ...executionMetrics, hasReachedAlertLimit: true },
});
checkDateIsNearNow(status.lastExecutionDate);
expect(status.warning).toEqual({
message: translations.taskRunner.warning.maxAlerts,
reason: RuleExecutionStatusWarningReasons.MAX_ALERTS,
});
expect(status.status).toBe('warning');
expect(status.error).toBe(undefined);
testExpectedMetrics(metrics!, {
...executionMetrics,
hasReachedAlertLimit: true,
});
});
});
describe('executionStatusFromError()', () => {

View file

@ -30,26 +30,31 @@ export function executionStatusFromState(
): IExecutionStatusAndMetrics {
const alertIds = Object.keys(stateWithMetrics.alertInstances ?? {});
const hasIncompleteAlertExecution =
stateWithMetrics.metrics.triggeredActionsStatus === ActionsCompletion.PARTIAL;
let status: RuleExecutionStatuses =
alertIds.length === 0 ? RuleExecutionStatusValues[0] : RuleExecutionStatusValues[1];
if (hasIncompleteAlertExecution) {
// Check for warning states
let warning = null;
// We only have a single warning field so prioritizing the alert circuit breaker over the actions circuit breaker
if (stateWithMetrics.metrics.hasReachedAlertLimit) {
status = RuleExecutionStatusValues[5];
warning = {
reason: RuleExecutionStatusWarningReasons.MAX_ALERTS,
message: translations.taskRunner.warning.maxAlerts,
};
} else if (stateWithMetrics.metrics.triggeredActionsStatus === ActionsCompletion.PARTIAL) {
status = RuleExecutionStatusValues[5];
warning = {
reason: RuleExecutionStatusWarningReasons.MAX_EXECUTABLE_ACTIONS,
message: translations.taskRunner.warning.maxExecutableActions,
};
}
return {
status: {
lastExecutionDate: lastExecutionDate ?? new Date(),
status,
...(hasIncompleteAlertExecution && {
warning: {
reason: RuleExecutionStatusWarningReasons.MAX_EXECUTABLE_ACTIONS,
message: translations.taskRunner.warning.maxExecutableActions,
},
}),
...(warning ? { warning } : {}),
},
metrics: stateWithMetrics.metrics,
};

View file

@ -24,6 +24,7 @@ describe('RuleRunMetricsStore', () => {
expect(ruleRunMetricsStore.getNumberOfRecoveredAlerts()).toBe(0);
expect(ruleRunMetricsStore.getNumberOfNewAlerts()).toBe(0);
expect(ruleRunMetricsStore.getStatusByConnectorType('any')).toBe(undefined);
expect(ruleRunMetricsStore.getHasReachedAlertLimit()).toBe(false);
});
test('sets and returns numSearches', () => {
@ -77,6 +78,11 @@ describe('RuleRunMetricsStore', () => {
expect(ruleRunMetricsStore.getTriggeredActionsStatus()).toBe(ActionsCompletion.PARTIAL);
});
test('sets and returns hasReachedAlertLimit', () => {
ruleRunMetricsStore.setHasReachedAlertLimit(true);
expect(ruleRunMetricsStore.getHasReachedAlertLimit()).toBe(true);
});
test('gets metrics', () => {
expect(ruleRunMetricsStore.getMetrics()).toEqual({
triggeredActionsStatus: 'partial',
@ -88,6 +94,7 @@ describe('RuleRunMetricsStore', () => {
numberOfRecoveredAlerts: 11,
numberOfTriggeredActions: 5,
totalSearchDurationMs: 2,
hasReachedAlertLimit: true,
});
});

View file

@ -18,6 +18,7 @@ interface State {
numberOfActiveAlerts: number;
numberOfRecoveredAlerts: number;
numberOfNewAlerts: number;
hasReachedAlertLimit: boolean;
connectorTypes: {
[key: string]: {
triggeredActionsStatus: ActionsCompletion;
@ -40,6 +41,7 @@ export class RuleRunMetricsStore {
numberOfActiveAlerts: 0,
numberOfRecoveredAlerts: 0,
numberOfNewAlerts: 0,
hasReachedAlertLimit: false,
connectorTypes: {},
};
@ -84,6 +86,9 @@ export class RuleRunMetricsStore {
triggeredActionsStatus: this.getTriggeredActionsStatus(),
};
};
public getHasReachedAlertLimit = () => {
return this.state.hasReachedAlertLimit;
};
// Setters
public setNumSearches = (numSearches: number) => {
@ -119,6 +124,9 @@ export class RuleRunMetricsStore {
}) => {
set(this.state, `connectorTypes["${actionTypeId}"].triggeredActionsStatus`, status);
};
public setHasReachedAlertLimit = (hasReachedAlertLimit: boolean) => {
this.state.hasReachedAlertLimit = hasReachedAlertLimit;
};
// Checkers
public hasReachedTheExecutableActionsLimit = (actionsConfigMap: ActionsConfigMap): boolean =>

View file

@ -104,6 +104,7 @@ const createRuleExecutorServicesMock = <
return {
alertFactory: {
create: jest.fn().mockReturnValue(alertFactoryMockCreate),
hasReachedAlertLimit: jest.fn().mockReturnValue(false),
done: jest.fn().mockReturnValue(alertFactoryMockDone),
},
savedObjectsClient: savedObjectsClientMock.create(),

View file

@ -40,6 +40,9 @@ const generateAlertingConfig = (): AlertingConfig => ({
actions: {
max: 1000,
},
alerts: {
max: 1000,
},
},
},
});

View file

@ -442,6 +442,7 @@ export class AlertingPlugin {
supportsEphemeralTasks: plugins.taskManager.supportsEphemeralTasks(),
maxEphemeralActionsPerRule: this.config.maxEphemeralActionsPerAlert,
cancelAlertsOnRuleTimeout: this.config.cancelAlertsOnRuleTimeout,
maxAlerts: this.config.rules.run.alerts.max,
actionsConfigMap: getActionsConfigMap(this.config.rules.run.actions),
usageCounter: this.usageCounter,
});

View file

@ -143,6 +143,7 @@ describe('Task Runner', () => {
kibanaBaseUrl: 'https://localhost:5601',
supportsEphemeralTasks: false,
maxEphemeralActionsPerRule: 10,
maxAlerts: 1000,
cancelAlertsOnRuleTimeout: true,
usageCounter: mockUsageCounter,
actionsConfigMap: {
@ -262,7 +263,7 @@ describe('Task Runner', () => {
);
expect(logger.debug).nthCalledWith(
3,
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":0,"numberOfGeneratedActions":0,"numberOfActiveAlerts":0,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":0,"triggeredActionsStatus":"complete"}'
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":0,"numberOfGeneratedActions":0,"numberOfActiveAlerts":0,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":0,"hasReachedAlertLimit":false,"triggeredActionsStatus":"complete"}'
);
testAlertingEventLogCalls({ status: 'ok' });
@ -343,7 +344,7 @@ describe('Task Runner', () => {
);
expect(logger.debug).nthCalledWith(
4,
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":1,"numberOfGeneratedActions":1,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":1,"triggeredActionsStatus":"complete"}'
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":1,"numberOfGeneratedActions":1,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":1,"hasReachedAlertLimit":false,"triggeredActionsStatus":"complete"}'
);
testAlertingEventLogCalls({
@ -430,7 +431,7 @@ describe('Task Runner', () => {
);
expect(logger.debug).nthCalledWith(
5,
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":0,"numberOfGeneratedActions":0,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":1,"triggeredActionsStatus":"complete"}'
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":0,"numberOfGeneratedActions":0,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":1,"hasReachedAlertLimit":false,"triggeredActionsStatus":"complete"}'
);
testAlertingEventLogCalls({
@ -604,7 +605,7 @@ describe('Task Runner', () => {
);
expect(logger.debug).nthCalledWith(
5,
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":1,"numberOfGeneratedActions":1,"numberOfActiveAlerts":2,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":2,"triggeredActionsStatus":"complete"}'
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":1,"numberOfGeneratedActions":1,"numberOfActiveAlerts":2,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":2,"hasReachedAlertLimit":false,"triggeredActionsStatus":"complete"}'
);
expect(mockUsageCounter.incrementCounter).not.toHaveBeenCalled();
}
@ -1119,7 +1120,7 @@ describe('Task Runner', () => {
);
expect(logger.debug).nthCalledWith(
5,
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":2,"numberOfGeneratedActions":2,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":1,"numberOfNewAlerts":0,"triggeredActionsStatus":"complete"}'
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":2,"numberOfGeneratedActions":2,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":1,"numberOfNewAlerts":0,"hasReachedAlertLimit":false,"triggeredActionsStatus":"complete"}'
);
testAlertingEventLogCalls({
@ -1232,7 +1233,7 @@ describe('Task Runner', () => {
);
expect(logger.debug).nthCalledWith(
5,
`ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":2,"numberOfGeneratedActions":2,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":1,"numberOfNewAlerts":0,"triggeredActionsStatus":"complete"}`
`ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":2,"numberOfGeneratedActions":2,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":1,"numberOfNewAlerts":0,"hasReachedAlertLimit":false,"triggeredActionsStatus":"complete"}`
);
testAlertingEventLogCalls({
@ -2362,7 +2363,7 @@ describe('Task Runner', () => {
);
expect(logger.debug).nthCalledWith(
3,
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":0,"numberOfGeneratedActions":0,"numberOfActiveAlerts":0,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":0,"triggeredActionsStatus":"complete"}'
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":0,"numberOfGeneratedActions":0,"numberOfActiveAlerts":0,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":0,"hasReachedAlertLimit":false,"triggeredActionsStatus":"complete"}'
);
testAlertingEventLogCalls({
@ -2859,6 +2860,7 @@ describe('Task Runner', () => {
setRuleName = true,
logAlert = 0,
logAction = 0,
hasReachedAlertLimit = false,
}: {
status: string;
ruleContext?: RuleContextOpts;
@ -2873,6 +2875,7 @@ describe('Task Runner', () => {
logAction?: number;
errorReason?: string;
errorMessage?: string;
hasReachedAlertLimit?: boolean;
}) {
expect(alertingEventLogger.initialize).toHaveBeenCalledWith(ruleContext);
expect(alertingEventLogger.start).toHaveBeenCalled();
@ -2905,6 +2908,7 @@ describe('Task Runner', () => {
numberOfRecoveredAlerts: recoveredAlerts,
numberOfTriggeredActions: triggeredActions,
totalSearchDurationMs: 23423,
hasReachedAlertLimit,
triggeredActionsStatus: 'partial',
},
status: {
@ -2927,6 +2931,7 @@ describe('Task Runner', () => {
numberOfRecoveredAlerts: recoveredAlerts,
numberOfTriggeredActions: triggeredActions,
totalSearchDurationMs: 23423,
hasReachedAlertLimit,
triggeredActionsStatus: 'complete',
},
status: {

View file

@ -108,6 +108,7 @@ export class TaskRunner<
private readonly executionId: string;
private readonly ruleTypeRegistry: RuleTypeRegistry;
private readonly inMemoryMetrics: InMemoryMetrics;
private readonly maxAlerts: number;
private alertingEventLogger: AlertingEventLogger;
private usageCounter?: UsageCounter;
private searchAbortController: AbortController;
@ -138,6 +139,7 @@ export class TaskRunner<
this.cancelled = false;
this.executionId = uuid.v4();
this.inMemoryMetrics = inMemoryMetrics;
this.maxAlerts = context.maxAlerts;
this.alertingEventLogger = new AlertingEventLogger(this.context.eventLogger);
}
@ -228,22 +230,26 @@ export class TaskRunner<
executionHandler: ExecutionHandler<ActionGroupIds | RecoveryActionGroupId>,
ruleRunMetricsStore: RuleRunMetricsStore
) {
const {
actionGroup,
subgroup: actionSubgroup,
context,
state,
} = alert.getScheduledActionOptions()!;
alert.updateLastScheduledActions(actionGroup, actionSubgroup);
alert.unscheduleActions();
return executionHandler({
actionGroup,
actionSubgroup,
context,
state,
alertId,
ruleRunMetricsStore,
});
if (alert.hasScheduledActions()) {
const {
actionGroup,
subgroup: actionSubgroup,
context,
state,
} = alert.getScheduledActionOptions()!;
alert.updateLastScheduledActions(actionGroup, actionSubgroup);
alert.unscheduleActions();
return executionHandler({
actionGroup,
actionSubgroup,
context,
state,
alertId,
ruleRunMetricsStore,
});
}
return Promise.resolve();
}
private async executeRule(
@ -279,6 +285,8 @@ export class TaskRunner<
},
} = this.taskInstance;
const ruleRunMetricsStore = new RuleRunMetricsStore();
const executionHandler = this.getExecutionHandler(
ruleId,
rule.name,
@ -325,6 +333,16 @@ export class TaskRunner<
searchSourceClient,
});
const alertFactory = createAlertFactory<
State,
Context,
WithoutReservedActionGroups<ActionGroupIds, RecoveryActionGroupId>
>({
alerts,
logger: this.logger,
maxAlerts: this.maxAlerts,
canSetRecoveryContext: ruleType.doesSetRecoveryContext ?? false,
});
let updatedRuleTypeState: void | Record<string, unknown>;
try {
const ctx = {
@ -349,15 +367,7 @@ export class TaskRunner<
searchSourceClient: wrappedSearchSourceClient.searchSourceClient,
uiSettingsClient: this.context.uiSettings.asScopedToClient(savedObjectsClient),
scopedClusterClient: wrappedScopedClusterClient.client(),
alertFactory: createAlertFactory<
State,
Context,
WithoutReservedActionGroups<ActionGroupIds, RecoveryActionGroupId>
>({
alerts,
logger: this.logger,
canSetRecoveryContext: ruleType.doesSetRecoveryContext ?? false,
}),
alertFactory,
shouldWriteAlerts: () => this.shouldLogAndScheduleActionsForAlerts(),
shouldStopExecution: () => this.cancelled,
},
@ -391,15 +401,23 @@ export class TaskRunner<
})
);
} catch (err) {
this.alertingEventLogger.setExecutionFailed(
`rule execution failure: ${ruleLabel}`,
err.message
);
this.logger.error(err, {
tags: [this.ruleType.id, ruleId, 'rule-run-failed'],
error: { stack_trace: err.stack },
});
throw new ErrorWithReason(RuleExecutionStatusErrorReasons.Execute, err);
// Check if this error is due to reaching the alert limit
if (alertFactory.hasReachedAlertLimit()) {
this.logger.warn(
`rule execution generated greater than ${this.maxAlerts} alerts: ${ruleLabel}`
);
ruleRunMetricsStore.setHasReachedAlertLimit(true);
} else {
this.alertingEventLogger.setExecutionFailed(
`rule execution failure: ${ruleLabel}`,
err.message
);
this.logger.error(err, {
tags: [this.ruleType.id, ruleId, 'rule-run-failed'],
error: { stack_trace: err.stack },
});
throw new ErrorWithReason(RuleExecutionStatusErrorReasons.Execute, err);
}
}
this.alertingEventLogger.setExecutionSucceeded(`rule executed: ${ruleLabel}`);
@ -415,7 +433,6 @@ export class TaskRunner<
scopedClusterClientMetrics.esSearchDurationMs +
searchSourceClientMetrics.esSearchDurationMs,
};
const ruleRunMetricsStore = new RuleRunMetricsStore();
ruleRunMetricsStore.setNumSearches(searchMetrics.numSearches);
ruleRunMetricsStore.setTotalSearchDurationMs(searchMetrics.totalSearchDurationMs);
@ -426,7 +443,12 @@ export class TaskRunner<
Context,
ActionGroupIds,
RecoveryActionGroupId
>(alerts, originalAlerts);
>({
alerts,
existingAlerts: originalAlerts,
hasReachedAlertLimit: alertFactory.hasReachedAlertLimit(),
alertLimit: this.maxAlerts,
});
logAlerts({
logger: this.logger,

View file

@ -120,6 +120,7 @@ describe('Task Runner Cancel', () => {
kibanaBaseUrl: 'https://localhost:5601',
supportsEphemeralTasks: false,
maxEphemeralActionsPerRule: 10,
maxAlerts: 1000,
cancelAlertsOnRuleTimeout: true,
usageCounter: mockUsageCounter,
actionsConfigMap: {
@ -414,7 +415,7 @@ describe('Task Runner Cancel', () => {
);
expect(logger.debug).nthCalledWith(
7,
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":1,"numberOfGeneratedActions":1,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":1,"triggeredActionsStatus":"complete"}'
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":1,"numberOfGeneratedActions":1,"numberOfActiveAlerts":1,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":1,"hasReachedAlertLimit":false,"triggeredActionsStatus":"complete"}'
);
}
@ -428,6 +429,7 @@ describe('Task Runner Cancel', () => {
status,
logAlert = 0,
logAction = 0,
hasReachedAlertLimit = false,
}: {
status: string;
ruleContext?: RuleContextOpts;
@ -439,6 +441,7 @@ describe('Task Runner Cancel', () => {
setRuleName?: boolean;
logAlert?: number;
logAction?: number;
hasReachedAlertLimit?: boolean;
}) {
expect(alertingEventLogger.initialize).toHaveBeenCalledWith(ruleContext);
expect(alertingEventLogger.start).toHaveBeenCalled();
@ -455,6 +458,7 @@ describe('Task Runner Cancel', () => {
numberOfRecoveredAlerts: recoveredAlerts,
numberOfTriggeredActions: triggeredActions,
totalSearchDurationMs: 23423,
hasReachedAlertLimit,
triggeredActionsStatus: 'complete',
},
status: {

View file

@ -98,6 +98,7 @@ describe('Task Runner Factory', () => {
kibanaBaseUrl: 'https://localhost:5601',
supportsEphemeralTasks: true,
maxEphemeralActionsPerRule: 10,
maxAlerts: 1000,
cancelAlertsOnRuleTimeout: true,
executionContext,
usageCounter: mockUsageCounter,

View file

@ -53,6 +53,7 @@ export interface TaskRunnerContext {
kibanaBaseUrl: string | undefined;
supportsEphemeralTasks: boolean;
maxEphemeralActionsPerRule: number;
maxAlerts: number;
actionsConfigMap: ActionsConfigMap;
cancelAlertsOnRuleTimeout: boolean;
usageCounter?: UsageCounter;

View file

@ -80,6 +80,7 @@ export interface RuleExecutorServices<
scopedClusterClient: IScopedClusterClient;
alertFactory: {
create: (id: string) => PublicAlert<InstanceState, InstanceContext, ActionGroupIds>;
hasReachedAlertLimit: () => boolean;
done: () => AlertFactoryDoneUtils<InstanceState, InstanceContext, ActionGroupIds>;
};
shouldWriteAlerts: () => boolean;

View file

@ -73,6 +73,7 @@ function createRule(shouldWriteAlerts: boolean = true) {
scheduleActions,
} as any;
},
hasReachedAlertLimit: () => false,
done: () => ({ getRecoveredAlerts: () => [] }),
};

View file

@ -185,6 +185,7 @@ export const previewRulesRoute = async (
| 'getContext'
| 'hasContext'
>;
hasReachedAlertLimit: () => boolean;
done: () => { getRecoveredAlerts: () => [] };
}
) => {
@ -284,7 +285,11 @@ export const previewRulesRoute = async (
queryAlertType.name,
previewRuleParams,
() => true,
{ create: alertInstanceFactoryStub, done: () => ({ getRecoveredAlerts: () => [] }) }
{
create: alertInstanceFactoryStub,
hasReachedAlertLimit: () => false,
done: () => ({ getRecoveredAlerts: () => [] }),
}
);
break;
case 'saved_query':
@ -297,7 +302,11 @@ export const previewRulesRoute = async (
savedQueryAlertType.name,
previewRuleParams,
() => true,
{ create: alertInstanceFactoryStub, done: () => ({ getRecoveredAlerts: () => [] }) }
{
create: alertInstanceFactoryStub,
hasReachedAlertLimit: () => false,
done: () => ({ getRecoveredAlerts: () => [] }),
}
);
break;
case 'threshold':
@ -310,7 +319,11 @@ export const previewRulesRoute = async (
thresholdAlertType.name,
previewRuleParams,
() => true,
{ create: alertInstanceFactoryStub, done: () => ({ getRecoveredAlerts: () => [] }) }
{
create: alertInstanceFactoryStub,
hasReachedAlertLimit: () => false,
done: () => ({ getRecoveredAlerts: () => [] }),
}
);
break;
case 'threat_match':
@ -323,7 +336,11 @@ export const previewRulesRoute = async (
threatMatchAlertType.name,
previewRuleParams,
() => true,
{ create: alertInstanceFactoryStub, done: () => ({ getRecoveredAlerts: () => [] }) }
{
create: alertInstanceFactoryStub,
hasReachedAlertLimit: () => false,
done: () => ({ getRecoveredAlerts: () => [] }),
}
);
break;
case 'eql':
@ -334,7 +351,11 @@ export const previewRulesRoute = async (
eqlAlertType.name,
previewRuleParams,
() => true,
{ create: alertInstanceFactoryStub, done: () => ({ getRecoveredAlerts: () => [] }) }
{
create: alertInstanceFactoryStub,
hasReachedAlertLimit: () => false,
done: () => ({ getRecoveredAlerts: () => [] }),
}
);
break;
case 'machine_learning':
@ -345,7 +366,11 @@ export const previewRulesRoute = async (
mlAlertType.name,
previewRuleParams,
() => true,
{ create: alertInstanceFactoryStub, done: () => ({ getRecoveredAlerts: () => [] }) }
{
create: alertInstanceFactoryStub,
hasReachedAlertLimit: () => false,
done: () => ({ getRecoveredAlerts: () => [] }),
}
);
break;
case 'new_terms':
@ -356,7 +381,11 @@ export const previewRulesRoute = async (
newTermsAlertType.name,
previewRuleParams,
() => true,
{ create: alertInstanceFactoryStub, done: () => ({ getRecoveredAlerts: () => [] }) }
{
create: alertInstanceFactoryStub,
hasReachedAlertLimit: () => false,
done: () => ({ getRecoveredAlerts: () => [] }),
}
);
break;
default:

View file

@ -76,6 +76,7 @@ export const createRuleTypeMocks = (
scopedClusterClient: elasticsearchServiceMock.createScopedClusterClient(),
alertFactory: {
create: jest.fn(() => ({ scheduleActions })),
hasReachedAlertLimit: () => false,
done: jest.fn().mockResolvedValue({}),
},
findAlerts: jest.fn(), // TODO: does this stay?

View file

@ -40,6 +40,7 @@ const alertFactory = (contextKeys: unknown[], testAlertActionArr: unknown[]) =>
);
return alertInstance;
},
hasReachedAlertLimit: () => false,
done: () => ({ getRecoveredAlerts: () => [] }),
});

View file

@ -128,6 +128,13 @@ export const ALERT_WARNING_MAX_EXECUTABLE_ACTIONS_REASON = i18n.translate(
}
);
export const ALERT_WARNING_MAX_ALERTS_REASON = i18n.translate(
'xpack.triggersActionsUI.sections.rulesList.ruleWarningReasonMaxAlerts',
{
defaultMessage: 'Alert limit exceeded',
}
);
export const ALERT_WARNING_UNKNOWN_REASON = i18n.translate(
'xpack.triggersActionsUI.sections.rulesList.ruleWarningReasonUnknown',
{
@ -148,5 +155,6 @@ export const rulesErrorReasonTranslationsMapping = {
export const rulesWarningReasonTranslationsMapping = {
maxExecutableActions: ALERT_WARNING_MAX_EXECUTABLE_ACTIONS_REASON,
maxAlerts: ALERT_WARNING_MAX_ALERTS_REASON,
unknown: ALERT_WARNING_UNKNOWN_REASON,
};

View file

@ -175,6 +175,7 @@ export function createTestConfig(name: string, options: CreateTestConfigOptions)
'--xpack.alerting.invalidateApiKeysTask.interval="15s"',
'--xpack.alerting.healthCheck.interval="1s"',
'--xpack.alerting.rules.minimumScheduleInterval.value="1s"',
'--xpack.alerting.rules.run.alerts.max=20',
`--xpack.alerting.rules.run.actions.connectorTypeOverrides=${JSON.stringify([
{ id: 'test.capped', max: '1' },
])}`,

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { FtrProviderContext } from '../../../../../common/ftr_provider_context';
// eslint-disable-next-line import/no-default-export
export default function alertingCircuitBreakerTests({ loadTestFile }: FtrProviderContext) {
describe('circuit_breakers', () => {
loadTestFile(require.resolve('./max_alerts'));
});
}

View file

@ -0,0 +1,277 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import { Spaces } from '../../../../scenarios';
import { FtrProviderContext } from '../../../../../common/ftr_provider_context';
import {
ESTestIndexTool,
ES_TEST_INDEX_NAME,
getUrlPrefix,
ObjectRemover,
getEventLog,
} from '../../../../../common/lib';
import { createEsDocumentsWithGroups } from '../lib/create_test_data';
const RULE_INTERVAL_SECONDS = 6;
const RULE_INTERVALS_TO_WRITE = 1;
const RULE_INTERVAL_MILLIS = RULE_INTERVAL_SECONDS * 1000;
// eslint-disable-next-line import/no-default-export
export default function maxAlertsRuleTests({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
const retry = getService('retry');
const es = getService('es');
const esTestIndexTool = new ESTestIndexTool(es, retry);
describe('rule that hits max alerts circuit breaker', () => {
const objectRemover = new ObjectRemover(supertest);
beforeEach(async () => {
await esTestIndexTool.destroy();
await esTestIndexTool.setup();
});
afterEach(async () => {
await objectRemover.removeAll();
await esTestIndexTool.destroy();
});
it('persist existing alerts to next execution if circuit breaker is hit', async () => {
// write some documents in groups 0, 1, 2, 3, 4
await createEsDocumentsInGroups(5, getEndDate());
// create a rule that will always fire for each group
const ruleId = await createIndexThresholdRule({
name: 'always fire',
aggType: 'max',
aggField: 'testedValue',
groupBy: 'top',
termField: 'group',
termSize: 50,
thresholdComparator: '>=',
threshold: [0],
});
// make sure rule executes once before adding more documents
await retry.try(async () => {
return await getEventLog({
getService,
spaceId: Spaces.space1.id,
type: 'alert',
id: ruleId,
provider: 'alerting',
actions: new Map([['execute', { gte: 1 }]]),
});
});
// circuit breaker value is 20 so write some more docs for 20+ groups
// with a group offset value of 2 so that we won't see groups 0 or 1 in this set
// this should trigger the circuit breaker and while we'd expect groups 0 and 1
// to recover under normal conditions, they should stay active because the
// circuit breaker hit
await createEsDocumentsInGroups(22, getEndDate(), 2);
// get the events we're expecting
const events = await retry.try(async () => {
return await getEventLog({
getService,
spaceId: Spaces.space1.id,
type: 'alert',
id: ruleId,
provider: 'alerting',
actions: new Map([
['execute', { gte: 2 }],
['new-instance', { gte: 1 }],
['active-instance', { gte: 1 }],
]),
});
});
// execute events
const executeEvents = events.filter((event) => event?.event?.action === 'execute');
// earliest execute event should have 5 active, 5 new alerts
const firstExecuteEvent = executeEvents[0];
expect(firstExecuteEvent?.event?.outcome).to.eql('success');
expect(firstExecuteEvent?.kibana?.alerting?.status).to.eql('active');
expect(
firstExecuteEvent?.kibana?.alert?.rule?.execution?.metrics?.alert_counts?.active
).to.eql(5);
expect(firstExecuteEvent?.kibana?.alert?.rule?.execution?.metrics?.alert_counts?.new).to.eql(
5
);
expect(
firstExecuteEvent?.kibana?.alert?.rule?.execution?.metrics?.alert_counts?.recovered
).to.eql(0);
// second execute event should have warning status and active alerts should be at max (20)
// should have 15 new alerts (5 existing from previous run + 15 added until max hit)
// should have no recovered alerts
const secondExecuteEvent = executeEvents[1];
expect(secondExecuteEvent?.event?.outcome).to.eql('success');
expect(secondExecuteEvent?.event?.reason).to.eql('maxAlerts');
expect(secondExecuteEvent?.kibana?.alerting?.status).to.eql('warning');
expect(
secondExecuteEvent?.kibana?.alert?.rule?.execution?.metrics?.alert_counts?.active
).to.eql(20);
expect(secondExecuteEvent?.kibana?.alert?.rule?.execution?.metrics?.alert_counts?.new).to.eql(
15
);
expect(
secondExecuteEvent?.kibana?.alert?.rule?.execution?.metrics?.alert_counts?.recovered
).to.eql(0);
// get execution uuid for second execute event and get all active instances for this uuid
const executionUuid = secondExecuteEvent?.kibana?.alert?.rule?.execution?.uuid;
const activeAlerts = events.filter(
(event) =>
event?.kibana?.alert?.rule?.execution?.uuid === executionUuid &&
event?.event?.action === 'active-instance'
);
const activeAlertIds = activeAlerts.map((alert) => alert?.kibana?.alerting?.instance_id);
// active alert ids should include all 5 alert ids from the first execution
expect(activeAlertIds.includes('group-0')).to.be(true);
expect(activeAlertIds.includes('group-1')).to.be(true);
expect(activeAlertIds.includes('group-2')).to.be(true);
expect(activeAlertIds.includes('group-3')).to.be(true);
expect(activeAlertIds.includes('group-4')).to.be(true);
// create more es documents that will fall under the circuit breaker
// offset by 2 so we expect groups 0 and 1 to finally recover
// it looks like alerts were reported in reverse order (group-23, 22, 21, down to 9)
// so all the 15 new alerts will recover, leading to 17 recovered alerts
// so our active alerts will be groups 2, 3, 4, 5 and 6 with groups 5 and 6 as new alerts
await createEsDocumentsInGroups(5, getEndDate(), 2);
const recoveredEvents = await retry.try(async () => {
return await getEventLog({
getService,
spaceId: Spaces.space1.id,
type: 'alert',
id: ruleId,
provider: 'alerting',
actions: new Map([['recovered-instance', { gte: 17 }]]),
});
});
// because the "execute" event is written at the end of execution
// after getting the correct number of recovered-instance events, we're often not
// getting the final "execute" event. use the execution UUID to grab it directly
const recoveredEventExecutionUuid = recoveredEvents[0]?.kibana?.alert?.rule?.execution?.uuid;
const finalExecuteEvents = await retry.try(async () => {
return await getEventLog({
getService,
spaceId: Spaces.space1.id,
type: 'alert',
id: ruleId,
provider: 'alerting',
filter: `kibana.alert.rule.execution.uuid:(${recoveredEventExecutionUuid})`,
actions: new Map([['execute', { gte: 1 }]]),
});
});
// get the latest execute event
const finalExecuteEvent = finalExecuteEvents[0];
expect(finalExecuteEvent?.event?.outcome).to.eql('success');
expect(finalExecuteEvent?.kibana?.alerting?.status).to.eql('active');
expect(finalExecuteEvent?.event?.reason).to.be(undefined);
expect(
finalExecuteEvent?.kibana?.alert?.rule?.execution?.metrics?.alert_counts?.active
).to.be(5);
expect(finalExecuteEvent?.kibana?.alert?.rule?.execution?.metrics?.alert_counts?.new).to.be(
2
);
expect(
finalExecuteEvent?.kibana?.alert?.rule?.execution?.metrics?.alert_counts?.recovered
).to.be(17);
const recoveredAlertIds = recoveredEvents.map(
(alert) => alert?.kibana?.alerting?.instance_id
);
expect(recoveredAlertIds.includes('group-0')).to.be(true);
expect(recoveredAlertIds.includes('group-1')).to.be(true);
});
interface CreateRuleParams {
name: string;
aggType: string;
aggField?: string;
timeField?: string;
timeWindowSize?: number;
groupBy: 'all' | 'top';
termField?: string;
termSize?: number;
thresholdComparator: string;
threshold: number[];
notifyWhen?: string;
indexName?: string;
}
async function createIndexThresholdRule(params: CreateRuleParams): Promise<string> {
const { status, body: createdRule } = await supertest
.post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`)
.set('kbn-xsrf', 'foo')
.send({
name: params.name,
consumer: 'alerts',
enabled: true,
rule_type_id: '.index-threshold',
schedule: { interval: `${RULE_INTERVAL_SECONDS}s` },
actions: [],
notify_when: 'onActiveAlert',
params: {
index: params.indexName || ES_TEST_INDEX_NAME,
timeField: params.timeField || 'date',
aggType: params.aggType,
aggField: params.aggField,
groupBy: params.groupBy,
termField: params.termField,
termSize: params.termSize,
timeWindowSize: params.timeWindowSize ?? RULE_INTERVAL_SECONDS * 5,
timeWindowUnit: 's',
thresholdComparator: params.thresholdComparator,
threshold: params.threshold,
},
});
expect(status).to.be(200);
const ruleId = createdRule.id;
objectRemover.add(Spaces.space1.id, ruleId, 'rule', 'alerting');
return ruleId;
}
async function createEsDocumentsInGroups(
groups: number,
endDate: string,
groupOffset: number = 0
) {
await createEsDocumentsWithGroups({
es,
esTestIndexTool,
endDate,
intervals: RULE_INTERVALS_TO_WRITE,
intervalMillis: RULE_INTERVAL_MILLIS,
groups,
indexName: ES_TEST_INDEX_NAME,
groupOffset,
});
}
function getEndDate() {
const endDateMillis = Date.now() + (RULE_INTERVALS_TO_WRITE - 1) * RULE_INTERVAL_MILLIS;
return new Date(endDateMillis).toISOString();
}
});
}

View file

@ -14,5 +14,6 @@ export default function alertingTests({ loadTestFile }: FtrProviderContext) {
loadTestFile(require.resolve('./es_query'));
loadTestFile(require.resolve('./long_running'));
loadTestFile(require.resolve('./cancellable'));
loadTestFile(require.resolve('./circuit_breaker'));
});
}

View file

@ -15,7 +15,7 @@ import {
getUrlPrefix,
ObjectRemover,
} from '../../../../../common/lib';
import { createEsDocuments } from './create_test_data';
import { createEsDocumentsWithGroups } from '../lib/create_test_data';
import { createDataStream, deleteDataStream } from '../lib/create_test_data';
const RULE_TYPE_ID = '.index-threshold';
@ -442,15 +442,15 @@ export default function ruleTests({ getService }: FtrProviderContext) {
groups: number,
indexName: string = ES_TEST_INDEX_NAME
) {
await createEsDocuments(
await createEsDocumentsWithGroups({
es,
esTestIndexTool,
endDate,
RULE_INTERVALS_TO_WRITE,
RULE_INTERVAL_MILLIS,
intervals: RULE_INTERVALS_TO_WRITE,
intervalMillis: RULE_INTERVAL_MILLIS,
groups,
indexName
);
indexName,
});
}
async function waitForDocs(count: number): Promise<any[]> {

View file

@ -1,85 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Client } from '@elastic/elasticsearch';
import { times } from 'lodash';
import { v4 as uuid } from 'uuid';
import { ESTestIndexTool, ES_TEST_INDEX_NAME } from '../../../../../common/lib';
// default end date
export const END_DATE = '2020-01-01T00:00:00Z';
export const DOCUMENT_SOURCE = 'queryDataEndpointTests';
export const DOCUMENT_REFERENCE = '-na-';
// Create a set of es documents to run the queries against.
// Will create `groups` documents for each interval.
// The difference between the dates of the docs will be intervalMillis.
// The date of the last documents will be startDate - intervalMillis / 2.
// So the documents will be written in the middle of each interval range.
// The data value written to each doc is a power of 2 + the group index, with
// 2^0 as the value of the last documents, the values increasing for older
// documents.
export async function createEsDocuments(
es: Client,
esTestIndexTool: ESTestIndexTool,
endDate: string = END_DATE,
intervals: number = 1,
intervalMillis: number = 1000,
groups: number = 2,
indexName: string = ES_TEST_INDEX_NAME
) {
const endDateMillis = Date.parse(endDate) - intervalMillis / 2;
const promises: Array<Promise<unknown>> = [];
times(intervals, (interval) => {
const date = endDateMillis - interval * intervalMillis;
// base value for each window is 2^interval
const testedValue = 2 ** interval;
// don't need await on these, wait at the end of the function
times(groups, (group) => {
promises.push(createEsDocument(es, date, testedValue + group, `group-${group}`, indexName));
});
});
await Promise.all(promises);
const totalDocuments = intervals * groups;
await esTestIndexTool.waitForDocs(DOCUMENT_SOURCE, DOCUMENT_REFERENCE, totalDocuments);
}
async function createEsDocument(
es: Client,
epochMillis: number,
testedValue: number,
group: string,
indexName: string
) {
const document = {
source: DOCUMENT_SOURCE,
reference: DOCUMENT_REFERENCE,
date: new Date(epochMillis).toISOString(),
date_epoch_millis: epochMillis,
testedValue,
testedValueUnsigned: '18446744073709551615',
group,
'@timestamp': new Date(epochMillis).toISOString(),
};
const response = await es.index({
id: uuid(),
index: indexName,
refresh: 'wait_for',
op_type: 'create',
body: document,
});
// console.log(`writing document to ${ES_TEST_INDEX_NAME}:`, JSON.stringify(document, null, 4));
if (response.result !== 'created') {
throw new Error(`document not created: ${JSON.stringify(response)}`);
}
}

View file

@ -10,7 +10,7 @@ import expect from '@kbn/expect';
import { Spaces } from '../../../../scenarios';
import { FtrProviderContext } from '../../../../../common/ftr_provider_context';
import { ESTestIndexTool, ES_TEST_INDEX_NAME, getUrlPrefix } from '../../../../../common/lib';
import { createEsDocuments } from './create_test_data';
import { createEsDocumentsWithGroups } from '../lib/create_test_data';
import { createDataStream, deleteDataStream } from '../lib/create_test_data';
const API_URI = 'api/triggers_actions_ui/data/_indices';
@ -27,7 +27,7 @@ export default function indicesEndpointTests({ getService }: FtrProviderContext)
before(async () => {
await esTestIndexTool.destroy();
await esTestIndexTool.setup();
await createEsDocuments(es, esTestIndexTool);
await createEsDocumentsWithGroups({ es, esTestIndexTool });
await createDataStream(es, ES_TEST_DATA_STREAM_NAME);
});

View file

@ -12,7 +12,7 @@ import { Spaces } from '../../../../scenarios';
import { FtrProviderContext } from '../../../../../common/ftr_provider_context';
import { ESTestIndexTool, ES_TEST_INDEX_NAME, getUrlPrefix } from '../../../../../common/lib';
import { createEsDocuments } from './create_test_data';
import { createEsDocumentsWithGroups } from '../lib/create_test_data';
const INDEX_THRESHOLD_TIME_SERIES_QUERY_URL = 'api/triggers_actions_ui/data/_time_series_query';
@ -60,7 +60,13 @@ export default function timeSeriesQueryEndpointTests({ getService }: FtrProvider
await esTestIndexTool.setup();
// To browse the documents created, comment out esTestIndexTool.destroy() in below, then:
// curl http://localhost:9220/.kibaka-alerting-test-data/_search?size=100 | json
await createEsDocuments(es, esTestIndexTool, START_DATE, INTERVALS, INTERVAL_MILLIS);
await createEsDocumentsWithGroups({
es,
esTestIndexTool,
endDate: START_DATE,
intervals: INTERVALS,
intervalMillis: INTERVAL_MILLIS,
});
});
after(async () => {

View file

@ -42,11 +42,63 @@ export async function createEsDocuments(
await esTestIndexTool.waitForDocs(DOCUMENT_SOURCE, DOCUMENT_REFERENCE, totalDocuments);
}
interface CreateEsDocumentsOpts {
es: Client;
esTestIndexTool: ESTestIndexTool;
endDate?: string;
intervals?: number;
intervalMillis?: number;
groups?: number;
groupOffset?: number;
indexName?: string;
}
// Create a set of es documents to run the queries against.
// Will create `groups` documents for each interval.
// The difference between the dates of the docs will be intervalMillis.
// The date of the last documents will be startDate - intervalMillis / 2.
// So the documents will be written in the middle of each interval range.
// The data value written to each doc is a power of 2 + the group index, with
// 2^0 as the value of the last documents, the values increasing for older
// documents.
export async function createEsDocumentsWithGroups({
es,
esTestIndexTool,
endDate = END_DATE,
intervals = 1,
intervalMillis = 1000,
groups = 2,
groupOffset = 0,
indexName = ES_TEST_INDEX_NAME,
}: CreateEsDocumentsOpts) {
const endDateMillis = Date.parse(endDate) - intervalMillis / 2;
const promises: Array<Promise<unknown>> = [];
times(intervals, (interval) => {
const date = endDateMillis - interval * intervalMillis;
// base value for each window is 2^interval
const testedValue = 2 ** interval;
// don't need await on these, wait at the end of the function
times(groups, (group) => {
promises.push(
createEsDocument(es, date, testedValue + group, indexName, `group-${group + groupOffset}`)
);
});
});
await Promise.all(promises);
const totalDocuments = intervals * groups;
await esTestIndexTool.waitForDocs(DOCUMENT_SOURCE, DOCUMENT_REFERENCE, totalDocuments);
}
async function createEsDocument(
es: Client,
epochMillis: number,
testedValue: number,
indexName: string
indexName: string,
group?: string
) {
const document = {
source: DOCUMENT_SOURCE,
@ -54,7 +106,9 @@ async function createEsDocument(
date: new Date(epochMillis).toISOString(),
date_epoch_millis: epochMillis,
testedValue,
testedValueUnsigned: '18446744073709551615',
'@timestamp': new Date(epochMillis).toISOString(),
...(group ? { group } : {}),
};
const response = await es.index({