[RAC][Rule Registry] Paginate results for fetching existing alerts (#122474)

* [RAC][Rule Registry] Paginate results for fetching existing alerts

* Change to difference to increase performance by 2 seconds for 50K alerts

* Changing the pagination to break up the request into 10K chunks

* Updating NOTICE.txt per CI instructions

* Changing warning message to debug

* Prefix log message with [Rule Registry]
This commit is contained in:
Chris Cowan 2022-01-12 13:10:48 -07:00 committed by liza-mae
parent bada57f2ef
commit 0236c8ae42
2 changed files with 88 additions and 39 deletions

View file

@ -10,6 +10,7 @@ import type { PublicContract } from '@kbn/utility-types';
import { getOrElse } from 'fp-ts/lib/Either';
import * as rt from 'io-ts';
import { v4 } from 'uuid';
import { difference } from 'lodash';
import {
AlertExecutorOptions,
AlertInstance,
@ -24,7 +25,6 @@ import {
ALERT_DURATION,
ALERT_END,
ALERT_INSTANCE_ID,
ALERT_RULE_UUID,
ALERT_START,
ALERT_STATUS,
ALERT_STATUS_ACTIVE,
@ -39,6 +39,7 @@ import {
} from '../../common/technical_rule_data_field_names';
import { IRuleDataClient } from '../rule_data_client';
import { AlertExecutorOptionsWithExtraServices } from '../types';
import { fetchExistingAlerts } from './fetch_existing_alerts';
import {
CommonAlertFieldName,
CommonAlertIdFieldName,
@ -179,13 +180,13 @@ export const createLifecycleExecutor =
const currentAlertIds = Object.keys(currentAlerts);
const trackedAlertIds = Object.keys(state.trackedAlerts);
const newAlertIds = currentAlertIds.filter((alertId) => !trackedAlertIds.includes(alertId));
const newAlertIds = difference(currentAlertIds, trackedAlertIds);
const allAlertIds = [...new Set(currentAlertIds.concat(trackedAlertIds))];
const trackedAlertStates = Object.values(state.trackedAlerts);
logger.debug(
`Tracking ${allAlertIds.length} alerts (${newAlertIds.length} new, ${trackedAlertStates.length} previous)`
`[Rule Registry] Tracking ${allAlertIds.length} alerts (${newAlertIds.length} new, ${trackedAlertStates.length} previous)`
);
const trackedAlertsDataMap: Record<
@ -194,40 +195,14 @@ export const createLifecycleExecutor =
> = {};
if (trackedAlertStates.length) {
const { hits } = await ruleDataClient.getReader().search({
body: {
query: {
bool: {
filter: [
{
term: {
[ALERT_RULE_UUID]: commonRuleFields[ALERT_RULE_UUID],
},
},
{
terms: {
[ALERT_UUID]: trackedAlertStates.map(
(trackedAlertState) => trackedAlertState.alertUuid
),
},
},
],
},
},
size: trackedAlertStates.length,
collapse: {
field: ALERT_UUID,
},
sort: {
[TIMESTAMP]: 'desc' as const,
},
},
allow_no_indices: true,
});
hits.hits.forEach((hit) => {
const alertId = hit._source[ALERT_INSTANCE_ID];
if (alertId) {
const result = await fetchExistingAlerts(
ruleDataClient,
trackedAlertStates,
commonRuleFields
);
result.forEach((hit) => {
const alertId = hit._source ? hit._source[ALERT_INSTANCE_ID] : void 0;
if (alertId && hit._source) {
trackedAlertsDataMap[alertId] = {
indexName: hit._index,
fields: hit._source,
@ -242,7 +217,7 @@ export const createLifecycleExecutor =
const currentAlertData = currentAlerts[alertId];
if (!alertData) {
logger.warn(`Could not find alert data for ${alertId}`);
logger.debug(`[Rule Registry] Could not find alert data for ${alertId}`);
}
const isNew = !state.trackedAlerts[alertId];
@ -291,7 +266,7 @@ export const createLifecycleExecutor =
const writeAlerts = ruleDataClient.isWriteEnabled() && shouldWriteAlerts();
if (allEventsToIndex.length > 0 && writeAlerts) {
logger.debug(`Preparing to index ${allEventsToIndex.length} alerts.`);
logger.debug(`[Rule Registry] Preparing to index ${allEventsToIndex.length} alerts.`);
await ruleDataClient.getWriter().bulk({
body: allEventsToIndex.flatMap(({ event, indexName }) => [

View file

@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { chunk } from 'lodash';
import { PublicContract } from '@kbn/utility-types';
import { IRuleDataClient } from '../rule_data_client';
import {
ALERT_RULE_UUID,
ALERT_UUID,
TIMESTAMP,
} from '../../common/technical_rule_data_field_names';
const CHUNK_SIZE = 10000;
interface TrackedAlertState {
alertId: string;
alertUuid: string;
started: string;
}
type RuleDataClient = PublicContract<IRuleDataClient>;
const fetchAlertsForStates = async (
ruleDataClient: RuleDataClient,
states: TrackedAlertState[],
commonRuleFields: any
) => {
const request = {
body: {
query: {
bool: {
filter: [
{
term: {
[ALERT_RULE_UUID]: commonRuleFields[ALERT_RULE_UUID],
},
},
{
terms: {
[ALERT_UUID]: states.map((state) => state.alertUuid),
},
},
],
},
},
size: states.length,
collapse: {
field: ALERT_UUID,
},
sort: {
[TIMESTAMP]: 'desc' as const,
},
},
allow_no_indices: true,
} as any;
const { hits } = await ruleDataClient.getReader().search(request);
return hits.hits;
};
export const fetchExistingAlerts = async (
ruleDataClient: RuleDataClient,
trackedAlertStates: TrackedAlertState[],
commonRuleFields: any
) => {
const results = await Promise.all(
chunk(trackedAlertStates, CHUNK_SIZE).map((states) =>
fetchAlertsForStates(ruleDataClient, states, commonRuleFields)
)
);
return results.flat();
};