Use msearch to fetch the alerts for maintenance windows with scoped query (#221702)

Currently we use a search with a `top_hits` aggregation to get the
alerts filtered with the scoped queries of the active maintenance
windows. But since `top_hits` aggregation has a fixed limit of 100 hits
we cannot use our max alerts limit to get all the alerts an execution
can generate.

This PR replaces `search` with `msearch` and removes the aggregation
from the search query so we can use the maxAlerts limit for the response
size.
This commit is contained in:
Ersin Erdal 2025-05-31 00:23:41 +02:00 committed by GitHub
parent f4db1ec2ad
commit 1343bfef35
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 236 additions and 99 deletions

View file

@ -77,6 +77,8 @@ import { maintenanceWindowsServiceMock } from '../task_runner/maintenance_window
import { getMockMaintenanceWindow } from '../data/maintenance_window/test_helpers';
import type { KibanaRequest } from '@kbn/core/server';
import { rule } from './lib/test_fixtures';
import { RUNTIME_MAINTENANCE_WINDOW_ID_FIELD } from './lib/get_summarized_alerts_query';
import { DEFAULT_MAX_ALERTS } from '../config';
const date = '2023-03-28T22:27:28.159Z';
const startedAtDate = '2023-03-28T13:00:00.000Z';
@ -393,6 +395,20 @@ describe('Alerts Client', () => {
hits: [],
},
});
clusterClient.msearch.mockResolvedValue({
took: 10,
responses: [
{
took: 10,
timed_out: false,
_shards: { failed: 0, successful: 1, total: 0, skipped: 0 },
hits: {
total: { relation: 'eq', value: 0 },
hits: [],
},
},
],
});
clusterClient.bulk.mockResponse({
errors: true,
took: 201,
@ -701,6 +717,41 @@ describe('Alerts Client', () => {
});
});
test('should index new alerts even if updatePersistedAlertsWithMaintenanceWindowIds fails', async () => {
const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>({
...alertsClientParams,
isServerless: true,
});
await alertsClient.initializeExecution(defaultExecutionOpts);
// Report 2 new alerts
const alertExecutorService = alertsClient.factory();
alertExecutorService.create('1').scheduleActions('default');
alertExecutorService.create('2').scheduleActions('default');
await alertsClient.processAlerts();
alertsClient.determineFlappingAlerts();
alertsClient.determineDelayedAlerts(determineDelayedAlertsOpts);
alertsClient.logAlerts(logAlertsOpts);
maintenanceWindowsService.getMaintenanceWindows.mockRejectedValue(
'Failed to fetch maintenance windows'
);
const result = await alertsClient.persistAlerts();
expect(logger.error).toHaveBeenCalledWith(
'Error updating maintenance window IDs:',
'Failed to fetch maintenance windows'
);
expect(result).toEqual({
alertIds: [],
maintenanceWindowIds: [],
});
});
test('should index new alerts with refresh: true in stateless', async () => {
const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>({
...alertsClientParams,
@ -2221,62 +2272,52 @@ describe('Alerts Client', () => {
});
describe('getMaintenanceWindowScopedQueryAlerts', () => {
const alertWithMwId1 = {
...mockAAD,
_id: 'alert_id_1',
_source: {
...mockAAD._source,
[ALERT_UUID]: 'alert_id_1',
[ALERT_MAINTENANCE_WINDOW_IDS]: ['mw1', 'mw2'],
},
};
const alertWithMwId2 = {
...mockAAD,
_id: 'alert_id_2',
_source: {
...mockAAD._source,
[ALERT_UUID]: 'alert_id_2',
[ALERT_MAINTENANCE_WINDOW_IDS]: ['mw1'],
},
};
beforeEach(() => {
clusterClient.search.mockReturnValueOnce({
// @ts-ignore
hits: { total: { value: 2 }, hits: [alertWithMwId1, alertWithMwId2] },
aggregations: {
mw1: {
doc_count: 2,
alertId: {
hits: {
hits: [
{
_id: 'alert_id_1',
_source: { [ALERT_UUID]: 'alert_id_1' },
clusterClient.msearch.mockResolvedValue({
took: 10,
responses: [
{
took: 10,
timed_out: false,
_shards: { failed: 0, successful: 1, total: 0, skipped: 0 },
hits: {
total: { relation: 'eq', value: 0 },
hits: [
{
_index: '.internal.alerts-test.alerts-default-000001',
fields: {
[ALERT_UUID]: ['alert_id_1'],
[RUNTIME_MAINTENANCE_WINDOW_ID_FIELD]: ['mw1'],
},
{
_id: 'alert_id_2',
_source: { [ALERT_UUID]: 'alert_id_2' },
},
{
_index: '.internal.alerts-test.alerts-default-000001',
fields: {
[ALERT_UUID]: ['alert_id_2'],
[RUNTIME_MAINTENANCE_WINDOW_ID_FIELD]: ['mw1'],
},
],
},
},
],
},
},
mw2: {
doc_count: 1,
alertId: {
hits: {
hits: [
{
_id: 'alert_id_1',
_source: { [ALERT_UUID]: 'alert_id_1' },
{
took: 10,
timed_out: false,
_shards: { failed: 0, successful: 1, total: 0, skipped: 0 },
hits: {
total: { relation: 'eq', value: 0 },
hits: [
{
_index: '.internal.alerts-test.alerts-default-000001',
fields: {
[ALERT_UUID]: ['alert_id_1'],
[RUNTIME_MAINTENANCE_WINDOW_ID_FIELD]: ['mw2'],
},
],
},
},
],
},
},
},
],
});
});
test('should get the persistent lifecycle alerts affected by scoped query successfully', async () => {
@ -2286,6 +2327,17 @@ describe('Alerts Client', () => {
getParamsByMaintenanceWindowScopedQuery
);
expect(clusterClient.msearch).toHaveBeenCalledWith({
ignore_unavailable: true,
index: expect.any(String),
searches: [
{},
expect.objectContaining({ size: DEFAULT_MAX_ALERTS }),
{},
expect.objectContaining({ size: DEFAULT_MAX_ALERTS }),
],
});
expect(result).toEqual({
mw1: ['alert_id_1', 'alert_id_2'],
mw2: ['alert_id_1'],
@ -2352,6 +2404,59 @@ describe('Alerts Client', () => {
'Must specify rule ID, space ID, and executionUuid for scoped query AAD alert query.'
);
});
test('should skip the falied request returned by msearch', async () => {
clusterClient.msearch.mockResolvedValue({
took: 10,
responses: [
{
took: 10,
timed_out: false,
_shards: { failed: 0, successful: 1, total: 0, skipped: 0 },
hits: {
total: { relation: 'eq', value: 0 },
hits: [
{
_index: '.internal.alerts-test.alerts-default-000001',
fields: {
[ALERT_UUID]: ['alert_id_1'],
[RUNTIME_MAINTENANCE_WINDOW_ID_FIELD]: ['mw1'],
},
},
{
_index: '.internal.alerts-test.alerts-default-000001',
fields: {
[ALERT_UUID]: ['alert_id_2'],
[RUNTIME_MAINTENANCE_WINDOW_ID_FIELD]: ['mw1'],
},
},
],
},
},
{
error: {
type: 'search_phase_execution_exception',
reason: 'Failed to fetch alerts for maintenance windows with scoped query',
},
status: 500,
},
],
});
const alertsClient = new AlertsClient(alertsClientParams);
// @ts-ignore
const result = await alertsClient.getMaintenanceWindowScopedQueryAlerts(
getParamsByMaintenanceWindowScopedQuery
);
expect(result).toEqual({
mw1: ['alert_id_1', 'alert_id_2'],
});
expect(logger.error).toHaveBeenCalledWith(
"Error fetching scoped query alerts for maintenance windows for test.rule-type:1 'rule-name': Failed to fetch alerts for maintenance windows with scoped query",
{ tags: ['test.rule-type', '1', 'alerts-client'] }
);
});
});
describe('updateAlertMaintenanceWindowIds', () => {

View file

@ -19,7 +19,11 @@ import {
ALERT_START,
} from '@kbn/rule-data-utils';
import { flatMap, get, isEmpty, keys } from 'lodash';
import type { SearchRequest } from '@elastic/elasticsearch/lib/api/types';
import type {
MsearchRequestItem,
MsearchResponseItem,
SearchRequest,
} from '@elastic/elasticsearch/lib/api/types';
import type { Alert } from '@kbn/alerts-as-data-utils';
import { DEFAULT_NAMESPACE_STRING } from '@kbn/core-saved-objects-utils-server';
import type { DeepPartial } from '@kbn/utility-types';
@ -48,7 +52,6 @@ import type {
UpdateableAlert,
GetSummarizedAlertsParams,
GetMaintenanceWindowScopedQueryAlertsParams,
ScopedQueryAggregationResult,
} from './types';
import {
buildNewAlert,
@ -57,7 +60,6 @@ import {
buildRecoveredAlert,
formatRule,
getHitsWithCount,
getScopedQueryHitsWithIds,
getLifecycleAlertsQueries,
getMaintenanceWindowAlertsQuery,
getContinualAlertsQuery,
@ -71,6 +73,7 @@ import {
} from '../task_runner/maintenance_windows';
import { ErrorWithType } from '../lib/error_with_type';
import { DEFAULT_MAX_ALERTS } from '../config';
import { RUNTIME_MAINTENANCE_WINDOW_ID_FIELD } from './lib/get_summarized_alerts_query';
export interface AlertsClientParams extends CreateAlertsClientParams {
elasticsearchClientPromise: Promise<ElasticsearchClient>;
@ -286,6 +289,22 @@ export class AlertsClient<
return { hits, total, aggregations };
}
public async msearch<Aggregation = unknown>(
searches: MsearchRequestItem[]
): Promise<Array<MsearchResponseItem<Alert & AlertData>>> {
const esClient = await this.options.elasticsearchClientPromise;
const index = this.isUsingDataStreams()
? this.indexTemplateAndPattern.alias
: this.indexTemplateAndPattern.pattern;
const { responses } = await esClient.msearch<Alert & AlertData>({
index,
searches,
ignore_unavailable: true,
});
return responses;
}
public report(
alert: ReportedAlert<
AlertData,
@ -384,8 +403,12 @@ export class AlertsClient<
public async persistAlerts(): Promise<AlertsAffectedByMaintenanceWindows> {
// Persist alerts first
await this.persistAlertsHelper();
return await this.updatePersistedAlertsWithMaintenanceWindowIds();
try {
return await this.updatePersistedAlertsWithMaintenanceWindowIds();
} catch (err) {
this.options.logger.error('Error updating maintenance window IDs:', err);
return { alertIds: [], maintenanceWindowIds: [] };
}
}
public getRawAlertInstancesForState(shouldOptimizeTaskState?: boolean) {
@ -684,7 +707,7 @@ export class AlertsClient<
const isLifecycleAlert = this.ruleType.autoRecoverAlerts ?? false;
const maxAlertLimit = this.legacyAlertsClient.getMaxAlertLimit();
const query = getMaintenanceWindowAlertsQuery({
const searches = getMaintenanceWindowAlertsQuery({
executionUuid,
ruleId,
maintenanceWindows,
@ -692,9 +715,32 @@ export class AlertsClient<
maxAlertLimit,
});
const response = await this.search<ScopedQueryAggregationResult>(query);
const responses = await this.msearch(searches);
const alertsByMaintenanceWindowIds: ScopedQueryAlerts = {};
return getScopedQueryHitsWithIds(response.aggregations);
responses.forEach((response) => {
if ('error' in response) {
this.options.logger.error(
`Error fetching scoped query alerts for maintenance windows ${this.ruleInfoMessage}: ${response.error.reason}`,
this.logTags
);
return;
}
response.hits.hits.forEach(({ fields }) => {
if (!fields) {
return;
}
const mwIdField = fields[RUNTIME_MAINTENANCE_WINDOW_ID_FIELD];
if (!alertsByMaintenanceWindowIds[mwIdField]) {
alertsByMaintenanceWindowIds[mwIdField] = [];
}
alertsByMaintenanceWindowIds[mwIdField].push(get(fields, ALERT_UUID)[0]);
});
});
return alertsByMaintenanceWindowIds;
}
private async updateAlertMaintenanceWindowIds(idsToUpdate: string[]) {
@ -781,7 +827,7 @@ export class AlertsClient<
// Run aggs to get all scoped query alert IDs, returns a record<maintenanceWindowId, alertIds>,
// indicating the maintenance window has matches a number of alerts with the scoped query.
const aggsResult = await this.getMaintenanceWindowScopedQueryAlerts({
const alertsByMaintenanceWindowIds = await this.getMaintenanceWindowScopedQueryAlerts({
ruleId: this.options.rule.id,
spaceId: this.options.rule.spaceId,
executionUuid: this.options.rule.executionId,
@ -791,7 +837,9 @@ export class AlertsClient<
const alertsAffectedByScopedQuery: string[] = [];
const appliedMaintenanceWindowIds: string[] = [];
for (const [scopedQueryMaintenanceWindowId, alertIds] of Object.entries(aggsResult)) {
for (const [scopedQueryMaintenanceWindowId, alertIds] of Object.entries(
alertsByMaintenanceWindowIds
)) {
// Go through matched alerts, find the in memory object
alertIds.forEach((alertId) => {
const newAlert = newAlerts.find((alert) => alert.getUuid() === alertId);

View file

@ -9,7 +9,7 @@ import type {
QueryDslQueryContainer,
SearchRequest,
SearchTotalHits,
AggregationsAggregationContainer,
MsearchRequestItem,
} from '@elastic/elasticsearch/lib/api/types';
import type { BoolQuery } from '@kbn/es-query';
import {
@ -34,10 +34,9 @@ import type {
GetQueryByTimeRangeParams,
GetQueryByScopedQueriesParams,
GetMaintenanceWindowAlertsQueryParams,
ScopedQueryAggregationResult,
SearchResult,
} from '../types';
import type { SummarizedAlertsChunk, ScopedQueryAlerts } from '../..';
import type { SummarizedAlertsChunk } from '../..';
import type { FormatAlert } from '../../types';
import { expandFlattenedAlert } from './format_alert';
import { injectAnalyzeWildcard } from './inject_analyze_wildcard';
@ -48,6 +47,8 @@ enum AlertTypes {
RECOVERED,
}
export const RUNTIME_MAINTENANCE_WINDOW_ID_FIELD = 'runtime_maintenance_window_id';
const getLifecycleAlertsQueryByExecutionUuid = ({
executionUuid,
ruleId,
@ -293,7 +294,7 @@ export const getQueryByScopedQueries = ({
action,
maintenanceWindows,
maxAlertLimit,
}: GetQueryByScopedQueriesParams): SearchRequest => {
}: GetQueryByScopedQueriesParams): MsearchRequestItem[] => {
const filters: QueryDslQueryContainer[] = [
{
term: {
@ -315,7 +316,7 @@ export const getQueryByScopedQueries = ({
});
}
const aggs: Record<string, AggregationsAggregationContainer> = {};
const searches: MsearchRequestItem[] = [];
maintenanceWindows.forEach(({ id, scopedQuery }) => {
if (!scopedQuery) {
@ -331,31 +332,30 @@ export const getQueryByScopedQueries = ({
}
)[0] as { bool: BoolQuery };
aggs[id] = {
filter: {
searches.push({});
searches.push({
query: {
bool: {
...scopedQueryFilter.bool,
filter: [...(scopedQueryFilter.bool?.filter || []), ...filters],
},
},
aggs: {
alertId: {
top_hits: {
size: 100,
_source: {
includes: [ALERT_UUID],
},
runtime_mappings: {
[RUNTIME_MAINTENANCE_WINDOW_ID_FIELD]: {
script: {
source: `emit('${id}');`,
},
type: 'keyword',
},
},
};
fields: [ALERT_UUID, RUNTIME_MAINTENANCE_WINDOW_ID_FIELD],
_source: false,
size: maxAlertLimit,
track_total_hits: true,
});
});
return {
size: 0,
track_total_hits: true,
aggs: { ...aggs },
};
return searches;
};
const generateAlertsFilterDSL = (
@ -450,20 +450,6 @@ const getHitsWithCount = <AlertData extends RuleAlertData>(
};
};
const getScopedQueryHitsWithIds = <AlertData extends RuleAlertData>(
aggregationsResult: SearchResult<AlertData, ScopedQueryAggregationResult>['aggregations']
): ScopedQueryAlerts => {
return Object.entries(aggregationsResult || {}).reduce<ScopedQueryAlerts>(
(result, [maintenanceWindowId, aggregation]) => {
result[maintenanceWindowId] = (aggregation.alertId?.hits?.hits || []).map(
(hit) => hit._source[ALERT_UUID]
);
return result;
},
{}
);
};
const getLifecycleAlertsQueries = ({
executionUuid,
start,
@ -534,7 +520,7 @@ const getMaintenanceWindowAlertsQuery = ({
action,
maintenanceWindows,
maxAlertLimit,
}: GetMaintenanceWindowAlertsQueryParams): SearchRequest => {
}: GetMaintenanceWindowAlertsQueryParams): MsearchRequestItem[] => {
return getQueryByScopedQueries({
executionUuid,
ruleId,
@ -549,5 +535,4 @@ export {
getLifecycleAlertsQueries,
getContinualAlertsQuery,
getMaintenanceWindowAlertsQuery,
getScopedQueryHitsWithIds,
};

View file

@ -15,7 +15,6 @@ export {
getLifecycleAlertsQueries,
getContinualAlertsQuery,
getMaintenanceWindowAlertsQuery,
getScopedQueryHitsWithIds,
} from './get_summarized_alerts_query';
export { expandFlattenedAlert } from './format_alert';
export { sanitizeBulkErrorResponse } from './sanitize_bulk_response';

View file

@ -334,7 +334,7 @@ export function createTestConfig(name: string, options: CreateTestConfigOptions)
'--xpack.alerting.invalidateApiKeysTask.removalDelay="1s"',
'--xpack.alerting.healthCheck.interval="1s"',
'--xpack.alerting.rules.minimumScheduleInterval.value="1s"',
'--xpack.alerting.rules.run.alerts.max=20',
'--xpack.alerting.rules.run.alerts.max=110',
`--xpack.alerting.rules.run.actions.connectorTypeOverrides=${JSON.stringify([
{ id: 'test.capped', max: '1' },
])}`,

View file

@ -306,7 +306,7 @@ function getExceedsAlertLimitRuleType() {
limit = services.alertFactory.alertLimit.getValue();
}
const alertsToCreate = limit ? limit : 25;
const alertsToCreate = limit ? limit : 115;
range(alertsToCreate)
.map(() => uuidv4())

View file

@ -107,7 +107,7 @@ export default function maxAlertsRuleTests({ getService }: FtrProviderContext) {
const hit = results.body.hits.hits[0];
expect(hit._source).to.eql({
numAlerts: 20,
numAlerts: 110,
});
});
@ -149,7 +149,7 @@ export default function maxAlertsRuleTests({ getService }: FtrProviderContext) {
const hit = results.body.hits.hits[0];
expect(hit._source).to.eql({
numAlerts: 20,
numAlerts: 110,
});
});