[8.x] Create tracked alerts based on AAD (#213670) (#217234)

# Backport

This will backport the following commits from `main` to `8.x`:
- [Create tracked alerts based on AAD
(#213670)](https://github.com/elastic/kibana/pull/213670)

<!--- Backport version: 9.6.6 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sorenlouv/backport)

<!--BACKPORT [{"author":{"name":"Ersin
Erdal","email":"92688503+ersin-erdal@users.noreply.github.com"},"sourceCommit":{"committedDate":"2025-04-04T01:13:53Z","message":"Create
tracked alerts based on AAD (#213670)\n\nTowards making AAD source of
truth.\n\nThis PR creates a `trackedAlerts` object in the alertsClient
and removes\nthe dependency on task state to fetch tracked alerts.\n\nAs
fetching tracked alerts becomes a critical part, we throw an error\nwhen
it fails.\n\n---------\n\nCo-authored-by: kibanamachine
<42973632+kibanamachine@users.noreply.github.com>","sha":"5bab6cf8361a7c8fec23e4d13ec1614da5dcec39","branchLabelMapping":{"^v9.1.0$":"main","^v8.19.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Team:ResponseOps","backport:version","v9.1.0","v8.19.0"],"title":"Create
tracked alerts based on
AAD","number":213670,"url":"https://github.com/elastic/kibana/pull/213670","mergeCommit":{"message":"Create
tracked alerts based on AAD (#213670)\n\nTowards making AAD source of
truth.\n\nThis PR creates a `trackedAlerts` object in the alertsClient
and removes\nthe dependency on task state to fetch tracked alerts.\n\nAs
fetching tracked alerts becomes a critical part, we throw an error\nwhen
it fails.\n\n---------\n\nCo-authored-by: kibanamachine
<42973632+kibanamachine@users.noreply.github.com>","sha":"5bab6cf8361a7c8fec23e4d13ec1614da5dcec39"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/213670","number":213670,"mergeCommit":{"message":"Create
tracked alerts based on AAD (#213670)\n\nTowards making AAD source of
truth.\n\nThis PR creates a `trackedAlerts` object in the alertsClient
and removes\nthe dependency on task state to fetch tracked alerts.\n\nAs
fetching tracked alerts becomes a critical part, we throw an error\nwhen
it fails.\n\n---------\n\nCo-authored-by: kibanamachine
<42973632+kibanamachine@users.noreply.github.com>","sha":"5bab6cf8361a7c8fec23e4d13ec1614da5dcec39"}},{"branch":"8.x","label":"v8.19.0","branchLabelMappingKey":"^v8.19.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->
This commit is contained in:
Ersin Erdal 2025-04-04 23:03:07 +02:00 committed by GitHub
parent 1746395bec
commit 87c94906fb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 636 additions and 178 deletions

View file

@ -7,12 +7,14 @@
import { type TypeOf } from '@kbn/config-schema';
import * as v1 from './v1';
import * as v2 from './v2';
export const stateSchemaByVersion = {
1: v1.versionDefinition,
2: v2.versionDefinition,
};
const latest = v1;
const latest = v2;
/**
* WARNING: Do not modify the code below when doing a new version.
* Update the "latest" variable instead.
@ -31,6 +33,7 @@ export const emptyState: LatestTaskStateSchema = {
alertRecoveredInstances: {},
previousStartedAt: null,
summaryActions: {},
trackedExecutions: undefined,
};
type Mutable<T> = {

View file

@ -84,6 +84,7 @@ describe('migrateMeta', () => {
flapping: true,
maintenanceWindowIds: ['id1', 'id2'],
pendingRecoveredCount: 3,
activeCount: 1,
uuid: 'abc123',
};
@ -96,6 +97,7 @@ describe('migrateMeta', () => {
flapping: true,
maintenanceWindowIds: ['id1', 'id2'],
pendingRecoveredCount: 3,
activeCount: 1,
uuid: 'abc123',
};

View file

@ -69,6 +69,7 @@ export function migrateMeta(meta: unknown): TypeOf<typeof metaSchema> | undefine
pendingRecoveredCount: isNumber(meta.pendingRecoveredCount)
? meta.pendingRecoveredCount
: undefined,
activeCount: isNumber(meta.activeCount) ? meta.activeCount : undefined,
uuid: isString(meta.uuid) ? meta.uuid : undefined,
};
}

View file

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { upMigration } from './migration';
import { versionSchema } from './schema';
export {
versionSchema,
throttledActionSchema,
rawAlertInstanceSchema,
metaSchema,
alertStateSchema,
lastScheduledActionsSchema,
} from './schema';
export const versionDefinition = {
up: upMigration,
schema: versionSchema,
};

View file

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { upMigration } from './migration';
describe('upMigration', () => {
it('should return the migrated state object', () => {
const inputState = {
alertTypeState: {},
alertInstances: {
instance1: {
meta: {
lastScheduledActions: {
group: 'group1',
date: '2023-07-31T12:00:00Z',
},
flappingHistory: [true, false, true],
flapping: true,
maintenanceWindowIds: ['id1', 'id2'],
pendingRecoveredCount: 3,
activeCount: 1,
uuid: 'abc123',
},
state: { key: 'value' },
},
},
alertRecoveredInstances: {},
previousStartedAt: '2023-07-30T12:00:00Z',
summaryActions: {
action1: { date: '2023-07-31T12:00:00Z' },
},
trackedExecutions: ['111-22-33'],
};
const expectedOutput = {
alertTypeState: {},
alertInstances: {
instance1: {
meta: {
lastScheduledActions: {
group: 'group1',
date: '2023-07-31T12:00:00Z',
},
flappingHistory: [true, false, true],
flapping: true,
maintenanceWindowIds: ['id1', 'id2'],
pendingRecoveredCount: 3,
activeCount: 1,
uuid: 'abc123',
},
state: { key: 'value' },
},
},
alertRecoveredInstances: {},
previousStartedAt: '2023-07-30T12:00:00Z',
summaryActions: {
action1: { date: '2023-07-31T12:00:00Z' },
},
trackedExecutions: ['111-22-33'],
};
const result = upMigration(inputState);
expect(result).toEqual(expectedOutput);
});
});

View file

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { type TypeOf } from '@kbn/config-schema';
import { isStringArray } from '../lib';
import { versionSchema } from './schema';
type VersionSchema = TypeOf<typeof versionSchema>;
export const upMigration = (state: Record<string, unknown>): VersionSchema => {
return {
...state,
trackedExecutions: isStringArray(state.trackedExecutions) ? state.trackedExecutions : undefined,
};
};

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { schema } from '@kbn/config-schema';
import { versionSchema as versionSchemaV1 } from '../v1';
export * from '../v1';
export const versionSchema = versionSchemaV1.extends({
trackedExecutions: schema.maybe(schema.arrayOf(schema.string())),
});

View file

@ -24,6 +24,7 @@ const createAlertsClientMock = () => {
client: jest.fn(),
determineDelayedAlerts: jest.fn(),
determineFlappingAlerts: jest.fn(),
getTrackedExecutions: jest.fn(),
};
});
};

View file

@ -57,7 +57,7 @@ import type {
LogAlertsOpts,
} from './types';
import { legacyAlertsClientMock } from './legacy_alerts_client.mock';
import { keys, omit, range } from 'lodash';
import { keys, omit } from 'lodash';
import { alertingEventLoggerMock } from '../lib/alerting_event_logger/alerting_event_logger.mock';
import { ruleRunMetricsStoreMock } from '../lib/rule_run_metrics_store.mock';
import { expandFlattenedAlert } from './lib';
@ -139,7 +139,6 @@ const trackedAlert1Raw = {
uuid: 'abc',
},
};
const trackedAlert1 = new Alert('1', trackedAlert1Raw);
const trackedAlert2Raw = {
state: { foo: true, start: '2023-03-28T02:27:28.159Z', duration: '36000000000000' },
meta: {
@ -149,15 +148,15 @@ const trackedAlert2Raw = {
uuid: 'def',
},
};
const trackedAlert2 = new Alert('2', trackedAlert2Raw);
const trackedRecovered3 = new Alert('3', {
const trackedAlert3Raw = {
state: { foo: false },
meta: {
flapping: false,
flappingHistory: [true, false, false],
uuid: 'xyz',
},
});
};
const fetchedAlert1 = {
[TIMESTAMP]: '2023-03-28T12:27:28.159Z',
@ -223,6 +222,14 @@ const fetchedAlert2 = {
[TAGS]: ['rule-', '-tags'],
};
const fetchedAlert3 = {
...fetchedAlert2,
[TIMESTAMP]: '2023-05-28T13:27:28.159Z',
[ALERT_START]: '2023-05-28T02:27:28.159Z',
[ALERT_RULE_EXECUTION_UUID]: '34lrfhw-645g-as67-sdf5-2534fvf8vfnjks',
[ALERT_UUID]: 'xyz',
};
const getNewIndexedAlertDoc = (overrides = {}) => ({
[TIMESTAMP]: date,
[EVENT_ACTION]: 'open',
@ -377,14 +384,24 @@ describe('Alerts Client', () => {
alertDelay: 0,
};
logAlertsOpts = { shouldLogAlerts: false, ruleRunMetricsStore };
clusterClient.search.mockResolvedValue({
took: 10,
timed_out: false,
_shards: { failed: 0, successful: 1, total: 0, skipped: 0 },
hits: {
total: { relation: 'eq', value: 0 },
hits: [],
},
});
clusterClient.bulk.mockResponse({
errors: true,
took: 201,
items: [],
});
});
describe('initializeExecution()', () => {
test('should initialize LegacyAlertsClient', async () => {
mockLegacyAlertsClient.getTrackedAlerts.mockImplementation(() => ({
active: {},
recovered: {},
}));
const spy = jest
.spyOn(LegacyAlertsClientModule, 'LegacyAlertsClient')
.mockImplementation(() => mockLegacyAlertsClient);
@ -396,9 +413,6 @@ describe('Alerts Client', () => {
defaultExecutionOpts
);
// no alerts to query for
expect(clusterClient.search).not.toHaveBeenCalled();
spy.mockRestore();
});
@ -423,24 +437,25 @@ describe('Alerts Client', () => {
expect(mockLegacyAlertsClient.initializeExecution).toHaveBeenCalledWith(
defaultExecutionOpts
);
expect(mockLegacyAlertsClient.getTrackedAlerts).not.toHaveBeenCalled();
spy.mockRestore();
});
test('should query for alert UUIDs if they exist', async () => {
mockLegacyAlertsClient.getTrackedAlerts.mockImplementation(() => ({
active: { '1': trackedAlert1, '2': trackedAlert2 },
recovered: { '3': trackedRecovered3 },
}));
test('should query for alerts', async () => {
const spy = jest
.spyOn(LegacyAlertsClientModule, 'LegacyAlertsClient')
.mockImplementation(() => mockLegacyAlertsClient);
const alertsClient = new AlertsClient(alertsClientParams);
await alertsClient.initializeExecution(defaultExecutionOpts);
const executionOptionsWithAlerts = {
...defaultExecutionOpts,
activeAlertsFromState: { '1': trackedAlert1Raw, '2': trackedAlert2Raw },
recoveredAlertsFromState: { '3': trackedAlert3Raw },
};
await alertsClient.initializeExecution(executionOptionsWithAlerts);
expect(mockLegacyAlertsClient.initializeExecution).toHaveBeenCalledWith(
defaultExecutionOpts
executionOptionsWithAlerts
);
expect(clusterClient.search).toHaveBeenCalledWith({
@ -448,13 +463,16 @@ describe('Alerts Client', () => {
query: {
bool: {
filter: [
{ term: { 'kibana.alert.rule.uuid': '1' } },
{ terms: { 'kibana.alert.uuid': ['abc', 'def', 'xyz'] } },
{ term: { [ALERT_RULE_UUID]: '1' } },
{ terms: { [ALERT_UUID]: ['abc', 'def', 'xyz'] } },
],
},
},
seq_no_primary_term: true,
size: 3,
sort: {
'kibana.alert.start': 'desc',
},
},
index: useDataStreamForAlerts
? '.alerts-test.alerts-default'
@ -465,70 +483,32 @@ describe('Alerts Client', () => {
spy.mockRestore();
});
test('should split queries into chunks when there are greater than 10,000 alert UUIDs', async () => {
mockLegacyAlertsClient.getTrackedAlerts.mockImplementation(() => ({
active: range(15000).reduce((acc: Record<string, Alert<{}, {}>>, value: number) => {
const id = `${value}`;
acc[id] = new Alert(id, {
state: { foo: true },
meta: {
flapping: false,
flappingHistory: [true, false],
lastScheduledActions: { group: 'default', date: new Date().toISOString() },
uuid: id,
},
});
return acc;
}, {}),
recovered: {},
}));
test('should query for alerts with execution uuid when provided', async () => {
const spy = jest
.spyOn(LegacyAlertsClientModule, 'LegacyAlertsClient')
.mockImplementation(() => mockLegacyAlertsClient);
const alertsClient = new AlertsClient(alertsClientParams);
await alertsClient.initializeExecution(defaultExecutionOpts);
expect(mockLegacyAlertsClient.initializeExecution).toHaveBeenCalledWith(
defaultExecutionOpts
);
expect(clusterClient.search).toHaveBeenCalledTimes(2);
spy.mockRestore();
});
test('should log but not throw if query returns error', async () => {
clusterClient.search.mockImplementation(() => {
throw new Error('search failed!');
await alertsClient.initializeExecution({
...defaultExecutionOpts,
trackedExecutions: ['1234'],
});
expect(mockLegacyAlertsClient.initializeExecution).toHaveBeenCalledWith({
...defaultExecutionOpts,
trackedExecutions: ['1234'],
});
mockLegacyAlertsClient.getTrackedAlerts.mockImplementation(() => ({
active: { '1': trackedAlert1 },
recovered: {},
}));
const spy = jest
.spyOn(LegacyAlertsClientModule, 'LegacyAlertsClient')
.mockImplementation(() => mockLegacyAlertsClient);
const alertsClient = new AlertsClient(alertsClientParams);
await alertsClient.initializeExecution(defaultExecutionOpts);
expect(mockLegacyAlertsClient.initializeExecution).toHaveBeenCalledWith(
defaultExecutionOpts
);
expect(clusterClient.search).toHaveBeenCalledWith({
body: {
query: {
bool: {
filter: [
{ term: { 'kibana.alert.rule.uuid': '1' } },
{ terms: { 'kibana.alert.uuid': ['abc'] } },
],
must: [{ term: { [ALERT_RULE_UUID]: '1' } }],
filter: [{ terms: { [ALERT_RULE_EXECUTION_UUID]: ['1234'] } }],
},
},
size: 1,
seq_no_primary_term: true,
size: 2000,
},
index: useDataStreamForAlerts
? '.alerts-test.alerts-default'
@ -536,6 +516,48 @@ describe('Alerts Client', () => {
ignore_unavailable: true,
});
spy.mockRestore();
});
test('should not query for the alerts if the rule type is not a lifecycle rule', async () => {
const alertsClient = new AlertsClient({
...alertsClientParams,
ruleType: {
...alertsClientParams.ruleType,
autoRecoverAlerts: false, // not a lifecycle rule
},
});
await alertsClient.initializeExecution(defaultExecutionOpts);
expect(clusterClient.search).not.toHaveBeenCalled();
});
test('should log an error and throw if query returns error', async () => {
clusterClient.search.mockImplementation(() => {
throw new Error('search failed!');
});
const spy = jest
.spyOn(LegacyAlertsClientModule, 'LegacyAlertsClient')
.mockImplementation(() => mockLegacyAlertsClient);
const alertsClient = new AlertsClient(alertsClientParams);
const executionOptionsWithUuid = {
...defaultExecutionOpts,
trackedExecutions: ['1234'],
};
try {
await alertsClient.initializeExecution(executionOptionsWithUuid);
} catch (e) {
spy.mockRestore();
expect(e.message).toBe(`search failed!`);
}
expect(mockLegacyAlertsClient.initializeExecution).toHaveBeenCalledWith(
executionOptionsWithUuid
);
expect(logger.error).toHaveBeenCalledWith(
`Error searching for tracked alerts by UUID ${ruleInfo} - search failed!`,
logTags
@ -543,6 +565,94 @@ describe('Alerts Client', () => {
spy.mockRestore();
});
test('should generate tracked executions from the alerts when fetched by alert uuids', async () => {
clusterClient.search.mockResolvedValue({
took: 10,
timed_out: false,
_shards: { failed: 0, successful: 1, total: 1, skipped: 0 },
hits: {
total: { relation: 'eq', value: 2 },
hits: [
{
_id: 'abc',
_index: '.internal.alerts-test.alerts-default-000001',
_seq_no: 41,
_primary_term: 665,
_source: fetchedAlert1,
},
{
_id: 'def',
_index: '.internal.alerts-test.alerts-default-000002',
_seq_no: 42,
_primary_term: 666,
_source: fetchedAlert2,
},
{
_id: 'xyz',
_index: '.internal.alerts-test.alerts-default-000002',
_seq_no: 43,
_primary_term: 667,
_source: fetchedAlert3,
},
],
},
});
const alertsClient = new AlertsClient(alertsClientParams);
await alertsClient.initializeExecution({
...defaultExecutionOpts,
activeAlertsFromState: {
'1': trackedAlert1Raw,
'2': trackedAlert2Raw,
'3': trackedAlert3Raw,
},
});
expect(alertsClient.getTrackedExecutions()).toEqual(
new Set([
'5f6aa57d-3e22-484e-bae8-cbed868f4d28',
'34lrfhw-645g-as67-sdf5-2534fvf8vfnjks',
])
);
});
test('should generate tracked executions from the state when fetched by execution uuids', async () => {
clusterClient.search.mockResolvedValue({
took: 10,
timed_out: false,
_shards: { failed: 0, successful: 1, total: 1, skipped: 0 },
hits: {
total: { relation: 'eq', value: 2 },
hits: [
{
_id: 'abc',
_index: '.internal.alerts-test.alerts-default-000001',
_seq_no: 41,
_primary_term: 665,
_source: fetchedAlert1,
},
{
_id: 'def',
_index: '.internal.alerts-test.alerts-default-000002',
_seq_no: 42,
_primary_term: 666,
_source: fetchedAlert2,
},
],
},
});
const alertsClient = new AlertsClient(alertsClientParams);
await alertsClient.initializeExecution({
...defaultExecutionOpts,
trackedExecutions: ['111', '222', '333'],
});
expect(alertsClient.getTrackedExecutions()).toEqual(new Set(['111', '222', '333']));
});
});
describe('persistAlerts()', () => {
@ -1636,6 +1746,16 @@ describe('Alerts Client', () => {
});
test('should log and swallow error if bulk indexing throws error', async () => {
clusterClient.search.mockResolvedValue({
took: 10,
timed_out: false,
_shards: { failed: 0, successful: 1, total: 1, skipped: 0 },
hits: {
total: { relation: 'eq', value: 2 },
hits: [],
},
});
clusterClient.bulk.mockImplementation(() => {
throw new Error('fail');
});
@ -1655,7 +1775,11 @@ describe('Alerts Client', () => {
alertsClient.determineDelayedAlerts(determineDelayedAlertsOpts);
alertsClient.logAlerts(logAlertsOpts);
await alertsClient.persistAlerts();
try {
await alertsClient.persistAlerts();
} catch (e) {
expect(e.message).toBe(`fail`);
}
expect(clusterClient.bulk).toHaveBeenCalled();
expect(logger.error).toHaveBeenCalledWith(
@ -2159,7 +2283,6 @@ describe('Alerts Client', () => {
},
});
});
test('should get the persistent lifecycle alerts affected by scoped query successfully', async () => {
const alertsClient = new AlertsClient(alertsClientParams);
// @ts-ignore
@ -2703,9 +2826,11 @@ describe('Alerts Client', () => {
const alertSource = {
...mockAlertPayload,
[ALERT_INSTANCE_ID]: alertInstanceId,
[ALERT_STATUS]: 'active',
[ALERT_UUID]: 'abc',
};
clusterClient.search.mockResolvedValue({
const newClusterClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
newClusterClient.search.mockResolvedValue({
took: 10,
timed_out: false,
_shards: { failed: 0, successful: 1, total: 1, skipped: 0 },
@ -2727,12 +2852,15 @@ describe('Alerts Client', () => {
{},
'default',
'recovered'
>(alertsClientParams);
>({
...alertsClientParams,
elasticsearchClientPromise: Promise.resolve(newClusterClient),
});
await alertsClient.initializeExecution({
...defaultExecutionOpts,
activeAlertsFromState: {
[alertInstanceId]: {},
[alertInstanceId]: trackedAlert1Raw,
},
});
@ -3200,11 +3328,36 @@ describe('Alerts Client', () => {
const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>(
alertsClientParams
);
clusterClient.search.mockResolvedValue({
took: 10,
timed_out: false,
_shards: { failed: 0, successful: 1, total: 1, skipped: 0 },
hits: {
total: { relation: 'eq', value: 0 },
hits: [
{
_id: 'abc',
_index: '.internal.alerts-test.alerts-default-000001',
_seq_no: 42,
_primary_term: 666,
_source: fetchedAlert1,
},
{
_id: 'def',
_index: '.internal.alerts-test.alerts-default-000002',
_seq_no: 42,
_primary_term: 666,
_source: fetchedAlert2,
},
],
},
});
await alertsClient.initializeExecution({
...defaultExecutionOpts,
activeAlertsFromState: { '1': trackedAlert1Raw, '2': trackedAlert2Raw },
});
expect(alertsClient.isTrackedAlert('1')).toBe(true);
expect(alertsClient.isTrackedAlert('2')).toBe(true);
expect(alertsClient.isTrackedAlert('3')).toBe(false);

View file

@ -13,14 +13,18 @@ import {
ALERT_STATUS,
ALERT_UUID,
ALERT_MAINTENANCE_WINDOW_IDS,
ALERT_STATUS_ACTIVE,
ALERT_STATUS_RECOVERED,
ALERT_RULE_EXECUTION_UUID,
ALERT_START,
} from '@kbn/rule-data-utils';
import { chunk, flatMap, get, isEmpty, keys } from 'lodash';
import { flatMap, get, isEmpty, keys } from 'lodash';
import type { SearchRequest } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { Alert } from '@kbn/alerts-as-data-utils';
import { DEFAULT_NAMESPACE_STRING } from '@kbn/core-saved-objects-utils-server';
import type { DeepPartial } from '@kbn/utility-types';
import type { BulkResponse } from '@elastic/elasticsearch/lib/api/types';
import { CLUSTER_BLOCK_EXCEPTION, isClusterBlockError } from '../lib/error_with_type';
import { CLUSTER_BLOCK_EXCEPTION } from '../lib/error_with_type';
import type { UntypedNormalizedRuleType } from '../rule_type_registry';
import type {
SummarizedAlerts,
@ -39,7 +43,6 @@ import type { IIndexPatternString } from '../alerts_service/resource_installer_u
import type {
IAlertsClient,
InitializeExecutionOpts,
TrackedAlerts,
ReportedAlert,
ReportedAlertData,
UpdateableAlert,
@ -67,9 +70,7 @@ import {
filterMaintenanceWindowsIds,
} from '../task_runner/maintenance_windows';
import { ErrorWithType } from '../lib/error_with_type';
// Term queries can take up to 10,000 terms
const CHUNK_SIZE = 10000;
import { DEFAULT_MAX_ALERTS } from '../config';
export interface AlertsClientParams extends CreateAlertsClientParams {
elasticsearchClientPromise: Promise<ElasticsearchClient>;
@ -102,13 +103,17 @@ export class AlertsClient<
// Query for alerts from the previous execution in order to identify the
// correct index to use if and when we need to make updates to existing active or
// recovered alerts
private fetchedAlerts: {
private trackedAlerts: {
indices: Record<string, string>;
data: Record<string, Alert & AlertData>;
active: Record<string, Alert & AlertData>;
recovered: Record<string, Alert & AlertData>;
seqNo: Record<string, number | undefined>;
primaryTerm: Record<string, number | undefined>;
get: (uuid: string) => Alert & AlertData;
getById: (id: string) => (Alert & AlertData) | undefined;
};
private trackedExecutions: Set<string>;
private startedAtString: string | null = null;
private runTimestampString: string | undefined;
private rule: AlertRule;
@ -142,7 +147,23 @@ export class AlertsClient<
? this.options.namespace
: DEFAULT_NAMESPACE_STRING,
});
this.fetchedAlerts = { indices: {}, data: {}, seqNo: {}, primaryTerm: {} };
this.trackedAlerts = {
indices: {},
active: {},
recovered: {},
seqNo: {},
primaryTerm: {},
get(uuid: string) {
return this.active[uuid] ?? this.recovered[uuid];
},
getById(id: string) {
return (
Object.values(this.active).find((alert) => get(alert, ALERT_INSTANCE_ID) === id) ??
Object.values(this.recovered).find((alert) => get(alert, ALERT_INSTANCE_ID) === id)
);
},
};
this.trackedExecutions = new Set([]);
this.rule = formatRule({ rule: this.options.rule, ruleType: this.options.ruleType });
this.ruleType = options.ruleType;
this._isUsingDataStreams = this.options.dataStreamAdapter.isUsingDataStreams();
@ -153,78 +174,96 @@ export class AlertsClient<
public async initializeExecution(opts: InitializeExecutionOpts) {
this.startedAtString = opts.startedAt ? opts.startedAt.toISOString() : null;
if (opts.runTimestamp) {
this.runTimestampString = opts.runTimestamp.toISOString();
const { runTimestamp, trackedExecutions } = opts;
if (runTimestamp) {
this.runTimestampString = runTimestamp.toISOString();
}
await this.legacyAlertsClient.initializeExecution(opts);
if (!this.ruleType.alerts?.shouldWrite) {
return;
}
// Get tracked alert UUIDs to query for
// TODO - we can consider refactoring to store the previous execution UUID and query
// for active and recovered alerts from the previous execution using that UUID
const trackedAlerts = this.legacyAlertsClient.getTrackedAlerts();
this.trackedExecutions = new Set(trackedExecutions ?? []);
const uuidsToFetch: string[] = [];
keys(trackedAlerts).forEach((key) => {
const tkey = key as keyof TrackedAlerts<LegacyState, LegacyContext>;
keys(trackedAlerts[tkey]).forEach((alertId: string) => {
uuidsToFetch.push(trackedAlerts[tkey][alertId].getUuid());
});
});
if (!uuidsToFetch.length) {
return;
}
const queryByUuid = async (uuids: string[]) => {
const result = await this.search({
size: uuids.length,
seq_no_primary_term: true,
query: {
bool: {
filter: [
{
term: {
[ALERT_RULE_UUID]: this.options.rule.id,
},
},
{
terms: {
[ALERT_UUID]: uuids,
},
},
],
// No need to fetch the tracked alerts for the non-lifecycle rules
if (this.ruleType.autoRecoverAlerts) {
const getTrackedAlertsByExecutionUuids = async (executionUuids: string[]) => {
const result = await this.search({
size: (opts.maxAlerts || DEFAULT_MAX_ALERTS) * 2,
seq_no_primary_term: true,
query: {
bool: {
must: [{ term: { [ALERT_RULE_UUID]: this.options.rule.id } }],
filter: [{ terms: { [ALERT_RULE_EXECUTION_UUID]: executionUuids } }],
},
},
},
});
return result.hits;
};
});
return result.hits;
};
try {
const results = await Promise.all(
chunk(uuidsToFetch, CHUNK_SIZE).map((uuidChunk: string[]) => queryByUuid(uuidChunk))
);
const getTrackedAlertsByAlertUuids = async () => {
const { activeAlertsFromState = {}, recoveredAlertsFromState = {} } = opts;
const uuidsToFetch: string[] = [];
Object.values(activeAlertsFromState).forEach((activeAlert) =>
uuidsToFetch.push(activeAlert.meta?.uuid!)
);
Object.values(recoveredAlertsFromState).forEach((recoveredAlert) =>
uuidsToFetch.push(recoveredAlert.meta?.uuid!)
);
for (const hit of results.flat()) {
const alertHit: Alert & AlertData = hit._source as Alert & AlertData;
const alertUuid = get(alertHit, ALERT_UUID);
const alertId = get(alertHit, ALERT_INSTANCE_ID);
if (uuidsToFetch.length <= 0) {
return [];
}
// Keep track of existing alert document so we can copy over data if alert is ongoing
this.fetchedAlerts.data[alertId] = alertHit;
const result = await this.search({
size: uuidsToFetch.length,
seq_no_primary_term: true,
sort: { [ALERT_START]: 'desc' },
query: {
bool: {
filter: [
{ term: { [ALERT_RULE_UUID]: this.options.rule.id } },
{ terms: { [ALERT_UUID]: uuidsToFetch } },
],
},
},
});
return result.hits;
};
// Keep track of index so we can update the correct document
this.fetchedAlerts.indices[alertUuid] = hit._index;
this.fetchedAlerts.seqNo[alertUuid] = hit._seq_no;
this.fetchedAlerts.primaryTerm[alertUuid] = hit._primary_term;
try {
const results = trackedExecutions
? await getTrackedAlertsByExecutionUuids(Array.from(this.trackedExecutions))
: await getTrackedAlertsByAlertUuids();
for (const hit of results.flat()) {
const alertHit = hit._source as Alert & AlertData;
const alertUuid = get(alertHit, ALERT_UUID);
if (get(alertHit, ALERT_STATUS) === ALERT_STATUS_ACTIVE) {
this.trackedAlerts.active[alertUuid] = alertHit;
}
if (get(alertHit, ALERT_STATUS) === ALERT_STATUS_RECOVERED) {
this.trackedAlerts.recovered[alertUuid] = alertHit;
}
this.trackedAlerts.indices[alertUuid] = hit._index;
this.trackedAlerts.seqNo[alertUuid] = hit._seq_no;
this.trackedAlerts.primaryTerm[alertUuid] = hit._primary_term;
// only when the alerts are fetched by alert uuids
if (!trackedExecutions) {
const executionUuid = get(alertHit, ALERT_RULE_EXECUTION_UUID);
if (executionUuid) {
this.trackedExecutions.add(executionUuid);
}
}
}
} catch (err) {
this.options.logger.error(
`Error searching for tracked alerts by UUID ${this.ruleInfoMessage} - ${err.message}`,
this.logTags
);
throw err;
}
} catch (err) {
this.options.logger.error(
`Error searching for tracked alerts by UUID ${this.ruleInfoMessage} - ${err.message}`,
this.logTags
);
}
}
@ -272,11 +311,10 @@ export class AlertsClient<
if (alert.payload) {
this.reportedAlerts[alert.id] = alert.payload;
}
return {
uuid: legacyAlert.getUuid(),
start: legacyAlert.getStart() ?? this.startedAtString,
alertDoc: this.fetchedAlerts.data[alert.id],
alertDoc: this.trackedAlerts.getById(alert.id),
};
}
@ -305,7 +343,12 @@ export class AlertsClient<
}
public isTrackedAlert(id: string) {
return this.legacyAlertsClient.isTrackedAlert(id);
const alert = this.trackedAlerts.getById(id);
const uuid = alert?.[ALERT_UUID];
if (uuid) {
return !!this.trackedAlerts.active[uuid];
}
return false;
}
public hasReachedAlertLimit(): boolean {
@ -424,8 +467,8 @@ export class AlertsClient<
const { rawActiveAlerts, rawRecoveredAlerts } = this.getRawAlertInstancesForState();
const activeAlerts = this.legacyAlertsClient.getProcessedAlerts('active');
const recoveredAlerts = this.legacyAlertsClient.getProcessedAlerts('recovered');
const activeAlerts = this.legacyAlertsClient.getProcessedAlerts(ALERT_STATUS_ACTIVE);
const recoveredAlerts = this.legacyAlertsClient.getProcessedAlerts(ALERT_STATUS_RECOVERED);
// TODO - Lifecycle alerts set some other fields based on alert status
// Example: workflow status - default to 'open' if not set
@ -435,17 +478,15 @@ export class AlertsClient<
for (const id of keys(rawActiveAlerts)) {
// See if there's an existing active alert document
if (activeAlerts[id]) {
if (
Object.hasOwn(this.fetchedAlerts.data, id) &&
get(this.fetchedAlerts.data[id], ALERT_STATUS) === 'active'
) {
const trackedAlert = this.trackedAlerts.get(activeAlerts[id].getUuid());
if (!!trackedAlert && get(trackedAlert, ALERT_STATUS) === ALERT_STATUS_ACTIVE) {
const isImproving = isAlertImproving<
AlertData,
LegacyState,
LegacyContext,
ActionGroupIds,
RecoveryActionGroupId
>(this.fetchedAlerts.data[id], activeAlerts[id], this.ruleType.actionGroups);
>(trackedAlert, activeAlerts[id], this.ruleType.actionGroups);
activeAlertsToIndex.push(
buildOngoingAlert<
AlertData,
@ -454,7 +495,7 @@ export class AlertsClient<
ActionGroupIds,
RecoveryActionGroupId
>({
alert: this.fetchedAlerts.data[id],
alert: trackedAlert,
legacyAlert: activeAlerts[id],
rule: this.rule,
isImproving,
@ -497,9 +538,10 @@ export class AlertsClient<
const recoveredAlertsToIndex: Array<Alert & AlertData> = [];
for (const id of keys(rawRecoveredAlerts)) {
const trackedAlert = this.trackedAlerts.getById(id);
// See if there's an existing alert document
// If there is not, log an error because there should be
if (Object.hasOwn(this.fetchedAlerts.data, id)) {
if (trackedAlert) {
recoveredAlertsToIndex.push(
recoveredAlerts[id]
? buildRecoveredAlert<
@ -509,7 +551,7 @@ export class AlertsClient<
ActionGroupIds,
RecoveryActionGroupId
>({
alert: this.fetchedAlerts.data[id],
alert: trackedAlert,
legacyAlert: recoveredAlerts[id],
rule: this.rule,
runTimestamp: this.runTimestampString,
@ -519,7 +561,7 @@ export class AlertsClient<
kibanaVersion: this.options.kibanaVersion,
})
: buildUpdatedRecoveredAlert<AlertData>({
alert: this.fetchedAlerts.data[id],
alert: trackedAlert,
legacyRawAlert: rawRecoveredAlerts[id],
runTimestamp: this.runTimestampString,
timestamp: currentTime,
@ -532,7 +574,7 @@ export class AlertsClient<
const alertsToIndex = [...activeAlertsToIndex, ...recoveredAlertsToIndex].filter(
(alert: Alert & AlertData) => {
const alertUuid = get(alert, ALERT_UUID);
const alertIndex = this.fetchedAlerts.indices[alertUuid];
const alertIndex = this.trackedAlerts.indices[alertUuid];
if (!alertIndex) {
return true;
} else if (!isValidAlertIndexName(alertIndex)) {
@ -552,9 +594,9 @@ export class AlertsClient<
return [
getBulkMeta(
alertUuid,
this.fetchedAlerts.indices[alertUuid],
this.fetchedAlerts.seqNo[alertUuid],
this.fetchedAlerts.primaryTerm[alertUuid],
this.trackedAlerts.indices[alertUuid],
this.trackedAlerts.seqNo[alertUuid],
this.trackedAlerts.primaryTerm[alertUuid],
this.isUsingDataStreams()
),
alert,
@ -592,13 +634,11 @@ export class AlertsClient<
});
}
} catch (err) {
if (isClusterBlockError(err)) {
throw err;
}
this.options.logger.error(
`Error writing ${alertsToIndex.length} alerts to ${this.indexTemplateAndPattern.alias} ${this.ruleInfoMessage} - ${err.message}`,
this.logTags
);
throw err;
}
}
@ -817,7 +857,7 @@ export class AlertsClient<
const recoveredLegacyAlerts = getRecoveredAlerts() ?? [];
return recoveredLegacyAlerts.map((alert) => ({
alert,
hit: this.fetchedAlerts.data[alert.getId()],
hit: this.trackedAlerts.get(alert.getUuid()),
}));
},
};
@ -827,6 +867,10 @@ export class AlertsClient<
return this._isUsingDataStreams;
}
public getTrackedExecutions() {
return this.trackedExecutions;
}
private throwIfHasClusterBlockException(response: BulkResponse) {
response.items.forEach((item) => {
const op = item.create || item.index || item.update || item.delete;

View file

@ -22,6 +22,7 @@ const createLegacyAlertsClientMock = () => {
client: jest.fn(),
determineDelayedAlerts: jest.fn(),
determineFlappingAlerts: jest.fn(),
getTrackedExecutions: jest.fn(),
};
});
};

View file

@ -128,10 +128,6 @@ export class LegacyAlertsClient<
});
}
public getTrackedAlerts() {
return this.trackedAlerts;
}
public getAlert(id: string) {
return this.alertFactory?.get(id);
}
@ -285,4 +281,7 @@ export class LegacyAlertsClient<
public async setAlertStatusToUntracked() {
return;
}
public getTrackedExecutions() {
return new Set([]);
}
}

View file

@ -30,6 +30,7 @@ import {
VERSION,
ALERT_RULE_EXECUTION_TIMESTAMP,
ALERT_SEVERITY_IMPROVING,
ALERT_STATUS_ACTIVE,
} from '@kbn/rule-data-utils';
import type { DeepPartial } from '@kbn/utility-types';
import type { Alert as LegacyAlert } from '../../alert/alert';
@ -95,7 +96,7 @@ export const buildNewAlert = <
[ALERT_MAINTENANCE_WINDOW_IDS]: legacyAlert.getMaintenanceWindowIds(),
[ALERT_CONSECUTIVE_MATCHES]: legacyAlert.getActiveCount(),
[ALERT_PENDING_RECOVERED_COUNT]: legacyAlert.getPendingRecoveredCount(),
[ALERT_STATUS]: 'active',
[ALERT_STATUS]: ALERT_STATUS_ACTIVE,
[ALERT_UUID]: legacyAlert.getUuid(),
[ALERT_SEVERITY_IMPROVING]: false,
[ALERT_WORKFLOW_STATUS]: get(cleanedPayload, ALERT_WORKFLOW_STATUS, 'open'),

View file

@ -28,6 +28,7 @@ import {
ALERT_PREVIOUS_ACTION_GROUP,
ALERT_SEVERITY_IMPROVING,
ALERT_RULE_EXECUTION_UUID,
ALERT_STATUS_RECOVERED,
} from '@kbn/rule-data-utils';
import type { DeepPartial } from '@kbn/utility-types';
import { get } from 'lodash';
@ -108,7 +109,7 @@ export const buildRecoveredAlert = <
[ALERT_CONSECUTIVE_MATCHES]: legacyAlert.getActiveCount(),
[ALERT_PENDING_RECOVERED_COUNT]: legacyAlert.getPendingRecoveredCount(),
// Set status to 'recovered'
[ALERT_STATUS]: 'recovered',
[ALERT_STATUS]: ALERT_STATUS_RECOVERED,
// Set latest duration as recovered alerts should have updated duration
...(legacyAlert.getState().duration
? { [ALERT_DURATION]: nanosToMicros(legacyAlert.getState().duration) }

View file

@ -62,6 +62,7 @@ export const initializeAlertsClient = async <
state: {
alertInstances: alertRawInstances = {},
alertRecoveredInstances: alertRecoveredRawInstances = {},
trackedExecutions,
},
} = taskInstance;
@ -128,6 +129,7 @@ export const initializeAlertsClient = async <
runTimestamp,
activeAlertsFromState: alertRawInstances,
recoveredAlertsFromState: alertRecoveredRawInstances,
trackedExecutions,
});
return alertsClient;

View file

@ -102,6 +102,7 @@ export interface IAlertsClient<
> | null;
determineFlappingAlerts(): void;
determineDelayedAlerts(opts: DetermineDelayedAlertsOpts): void;
getTrackedExecutions(): Set<string>;
}
export interface ProcessAndLogAlertsOpts {
@ -130,6 +131,7 @@ export interface InitializeExecutionOpts {
flappingSettings: RulesSettingsFlappingProperties;
activeAlertsFromState: Record<string, RawAlertInstance>;
recoveredAlertsFromState: Record<string, RawAlertInstance>;
trackedExecutions?: string[];
}
export interface TrackedAlerts<

View file

@ -396,6 +396,16 @@ describe('Ad Hoc Task Runner', () => {
mockValidateRuleTypeParams.mockReturnValue(mockedAdHocRunSO.attributes.rule.params);
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue(mockedAdHocRunSO);
actionsClient.bulkEnqueueExecution.mockResolvedValue({ errors: false, items: [] });
clusterClient.search.mockResolvedValue({
took: 10,
timed_out: false,
_shards: { failed: 0, successful: 1, total: 0, skipped: 0 },
hits: {
total: { relation: 'eq', value: 0 },
hits: [],
},
});
});
afterAll(() => fakeTimer.restore());

View file

@ -369,6 +369,7 @@ export const generateRunnerResult = ({
alertRecoveredInstances = {},
summaryActions = {},
taskRunError,
trackedExecutions = ['5f6aa57d-3e22-484e-bae8-cbed868f4d28'],
}: GeneratorParams = {}) => {
return {
monitoring: {
@ -402,6 +403,7 @@ export const generateRunnerResult = ({
...(state && { alertTypeState: {} }),
...(state && { previousStartedAt: new Date('1970-01-01T00:00:00.000Z').toISOString() }),
...(state && { summaryActions }),
...(state && { trackedExecutions }),
},
taskRunError,
};

View file

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const getTrackedExecutions = ({
trackedExecutions,
currentExecution,
limit,
}: {
trackedExecutions: Set<string>;
currentExecution: string;
limit: number;
}): string[] => {
const trackedExecutionsArray = Array.from(trackedExecutions ?? []);
trackedExecutionsArray.push(currentExecution);
if (trackedExecutionsArray.length > limit) {
trackedExecutionsArray.shift();
}
return trackedExecutionsArray;
};

View file

@ -3469,7 +3469,7 @@ describe('Task Runner', () => {
});
});
test('reschedules when persistAlerts returns a cluster_block_exception', async () => {
test('reschedules when persistAlerts returns an exception', async () => {
const err = new ErrorWithType({
message: 'Index is blocked',
type: 'cluster_block_exception',
@ -3502,6 +3502,78 @@ describe('Task Runner', () => {
);
});
test('reschedules when getting tracked alerts returns an exception', async () => {
const err = new Error(GENERIC_ERROR_MESSAGE);
alertsClient.initializeExecution.mockRejectedValueOnce(err);
alertsService.createAlertsClient.mockImplementation(() => alertsClient);
const taskRunner = new TaskRunner({
ruleType,
taskInstance: mockedTaskInstance,
context: taskRunnerFactoryInitializerParams,
inMemoryMetrics,
internalSavedObjectsRepository,
});
mockGetAlertFromRaw.mockReturnValue(mockedRuleTypeSavedObject as Rule);
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce(mockedRawRuleSO);
const runnerResult = await taskRunner.run();
expect(getErrorSource(runnerResult.taskRunError as Error)).toBe(TaskErrorSource.FRAMEWORK);
expect(runnerResult.state).toEqual(mockedTaskInstance.state);
expect(runnerResult.schedule!.interval).toEqual('10s');
expect(runnerResult.taskRunError).toMatchInlineSnapshot('[Error: GENERIC ERROR MESSAGE]');
expect(logger.debug).toHaveBeenCalledWith(
'ruleRunStatus for test:1: {"outcome":"failed","outcomeOrder":20,"warning":"unknown","outcomeMsg":["GENERIC ERROR MESSAGE"],"alertsCount":{}}',
{
tags: ['1', 'test'],
}
);
});
test('should remove the oldest execution id and return the tracked executions', async () => {
alertsClient.getTrackedExecutions.mockReturnValue(['1111', '2222', '3333', '4444', '5555']);
alertsClient.getProcessedAlerts.mockReturnValue({});
alertsClient.getSummarizedAlerts.mockResolvedValue({
new: {
count: 1,
data: [mockAAD],
},
ongoing: { count: 0, data: [] },
recovered: { count: 0, data: [] },
});
alertsClient.getRawAlertInstancesForState.mockResolvedValueOnce({ state: {}, meta: {} });
alertsService.createAlertsClient.mockImplementation(() => alertsClient);
rulesSettingsService.getSettings.mockResolvedValue({
flappingSettings: { ...DEFAULT_FLAPPING_SETTINGS, lookBackWindow: 5 },
queryDelaySettings: DEFAULT_QUERY_DELAY_SETTINGS,
});
alertsService.createAlertsClient.mockImplementation(() => alertsClient);
const taskRunner = new TaskRunner({
ruleType,
taskInstance: mockedTaskInstance,
context: taskRunnerFactoryInitializerParams,
inMemoryMetrics,
internalSavedObjectsRepository,
});
expect(AlertingEventLogger).toHaveBeenCalledTimes(1);
mockGetAlertFromRaw.mockReturnValue(mockedRuleTypeSavedObject as Rule);
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue(mockedRawRuleSO);
const runnerResult = await taskRunner.run();
expect(runnerResult.state.trackedExecutions).toEqual([
'2222',
'3333',
'4444',
'5555',
'5f6aa57d-3e22-484e-bae8-cbed868f4d28',
]);
});
function testAlertingEventLogCalls({
ruleContext = alertingEventLoggerInitializer,
ruleTypeDef = ruleType,

View file

@ -76,6 +76,7 @@ import {
clearExpiredSnoozes,
} from './lib';
import { isClusterBlockError } from '../lib/error_with_type';
import { getTrackedExecutions } from './lib/get_tracked_execution';
const FALLBACK_RETRY_INTERVAL = '5m';
const CONNECTIVITY_RETRY_INTERVAL = '5m';
@ -448,6 +449,11 @@ export class TaskRunner<
alertInstances: alertsToReturn,
alertRecoveredInstances: recoveredAlertsToReturn,
summaryActions: actionSchedulerResult.throttledSummaryActions,
trackedExecutions: getTrackedExecutions({
trackedExecutions: alertsClient.getTrackedExecutions(),
currentExecution: this.executionId,
limit: flappingSettings.lookBackWindow,
}),
};
}

View file

@ -289,6 +289,15 @@ describe('Task Runner', () => {
});
logger.get.mockImplementation(() => logger);
ruleType.executor.mockResolvedValue({ state: {} });
clusterClient.search.mockResolvedValue({
took: 10,
timed_out: false,
_shards: { failed: 0, successful: 1, total: 0, skipped: 0 },
hits: {
total: { relation: 'eq', value: 0 },
hits: [],
},
});
});
test('should not use legacy alerts client if alerts client created', async () => {