mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
[ML] Prevent duplicate notifications about the same anomaly result (#91485)
* [ML] check kibana even logs for existing alert instance * [ML] create alert instance key, add check for alert id * [ML] use anomaly_utils, check interval gap * [ML] add detector index * [ML] fix unit test * [ML] include detector_index into source
This commit is contained in:
parent
06b8fb44d4
commit
c84047bd36
9 changed files with 150 additions and 39 deletions
|
@ -15,7 +15,7 @@ export type TopHitsResultsKeys = 'top_record_hits' | 'top_bucket_hits' | 'top_in
|
|||
export interface AlertExecutionResult {
|
||||
count: number;
|
||||
key: number;
|
||||
key_as_string: string;
|
||||
alertInstanceKey: string;
|
||||
isInterim: boolean;
|
||||
jobIds: string[];
|
||||
timestamp: number;
|
||||
|
@ -47,10 +47,13 @@ interface BaseAnomalyAlertDoc {
|
|||
export interface RecordAnomalyAlertDoc extends BaseAnomalyAlertDoc {
|
||||
result_type: typeof ANOMALY_RESULT_TYPE.RECORD;
|
||||
function: string;
|
||||
field_name: string;
|
||||
by_field_value: string | number;
|
||||
over_field_value: string | number;
|
||||
partition_field_value: string | number;
|
||||
field_name?: string;
|
||||
by_field_name?: string;
|
||||
by_field_value?: string | number;
|
||||
over_field_name?: string;
|
||||
over_field_value?: string | number;
|
||||
partition_field_name?: string;
|
||||
partition_field_value?: string | number;
|
||||
}
|
||||
|
||||
export interface BucketAnomalyAlertDoc extends BaseAnomalyAlertDoc {
|
||||
|
|
|
@ -230,8 +230,6 @@ export function getEntityFieldName(record: AnomalyRecordDoc): string | undefined
|
|||
if (record.partition_field_name !== undefined) {
|
||||
return record.partition_field_name;
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
// Returns the value of the field to use as the entity value from the source record
|
||||
|
@ -249,8 +247,6 @@ export function getEntityFieldValue(record: AnomalyRecordDoc): string | number |
|
|||
if (record.partition_field_value !== undefined) {
|
||||
return record.partition_field_value;
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
// Returns the list of partitioning entity fields for the source record as a list
|
||||
|
|
|
@ -5,10 +5,10 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import { resolveTimeInterval } from './alerting_service';
|
||||
import { resolveBucketSpanInSeconds } from './alerting_service';
|
||||
|
||||
describe('Alerting Service', () => {
|
||||
test('should resolve maximum bucket interval', () => {
|
||||
expect(resolveTimeInterval(['15m', '1h', '6h', '90s'])).toBe('43200s');
|
||||
expect(resolveBucketSpanInSeconds(['15m', '1h', '6h', '90s'])).toBe(43200);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -7,6 +7,9 @@
|
|||
|
||||
import Boom from '@hapi/boom';
|
||||
import rison from 'rison-node';
|
||||
import { ElasticsearchClient } from 'kibana/server';
|
||||
import moment from 'moment';
|
||||
import { Duration } from 'moment/moment';
|
||||
import { MlClient } from '../ml_client';
|
||||
import {
|
||||
MlAnomalyDetectionAlertParams,
|
||||
|
@ -25,6 +28,8 @@ import {
|
|||
import { parseInterval } from '../../../common/util/parse_interval';
|
||||
import { AnomalyDetectionAlertContext } from './register_anomaly_detection_alert_type';
|
||||
import { MlJobsResponse } from '../../../common/types/job_service';
|
||||
import { ANOMALY_SCORE_MATCH_GROUP_ID } from '../../../common/constants/alerts';
|
||||
import { getEntityFieldName, getEntityFieldValue } from '../../../common/util/anomaly_utils';
|
||||
|
||||
function isDefined<T>(argument: T | undefined | null): argument is T {
|
||||
return argument !== undefined && argument !== null;
|
||||
|
@ -34,22 +39,23 @@ function isDefined<T>(argument: T | undefined | null): argument is T {
|
|||
* Resolves the longest bucket span from the list and multiply it by 2.
|
||||
* @param bucketSpans Collection of bucket spans
|
||||
*/
|
||||
export function resolveTimeInterval(bucketSpans: string[]): string {
|
||||
return `${
|
||||
export function resolveBucketSpanInSeconds(bucketSpans: string[]): number {
|
||||
return (
|
||||
Math.max(
|
||||
...bucketSpans
|
||||
.map((b) => parseInterval(b))
|
||||
.filter(isDefined)
|
||||
.map((v) => v.asSeconds())
|
||||
) * 2
|
||||
}s`;
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Alerting related server-side methods
|
||||
* @param mlClient
|
||||
* @param esClient
|
||||
*/
|
||||
export function alertingServiceProvider(mlClient: MlClient) {
|
||||
export function alertingServiceProvider(mlClient: MlClient, esClient: ElasticsearchClient) {
|
||||
const getAggResultsLabel = (resultType: AnomalyResultType) => {
|
||||
return {
|
||||
aggGroupLabel: `${resultType}_results` as PreviewResultsKeys,
|
||||
|
@ -177,10 +183,14 @@ export function alertingServiceProvider(mlClient: MlClient) {
|
|||
'is_interim',
|
||||
'function',
|
||||
'field_name',
|
||||
'by_field_name',
|
||||
'by_field_value',
|
||||
'over_field_name',
|
||||
'over_field_value',
|
||||
'partition_field_name',
|
||||
'partition_field_value',
|
||||
'job_id',
|
||||
'detector_index',
|
||||
],
|
||||
},
|
||||
size: 3,
|
||||
|
@ -257,14 +267,31 @@ export function alertingServiceProvider(mlClient: MlClient) {
|
|||
};
|
||||
};
|
||||
|
||||
/**
|
||||
* Provides unique key for the anomaly result.
|
||||
*/
|
||||
const getAlertInstanceKey = (source: any): string => {
|
||||
let alertInstanceKey = `${source.job_id}_${source.timestamp}`;
|
||||
if (source.result_type === ANOMALY_RESULT_TYPE.INFLUENCER) {
|
||||
alertInstanceKey += `_${source.influencer_field_name}_${source.influencer_field_value}`;
|
||||
} else if (source.result_type === ANOMALY_RESULT_TYPE.RECORD) {
|
||||
const fieldName = getEntityFieldName(source);
|
||||
const fieldValue = getEntityFieldValue(source);
|
||||
alertInstanceKey += `_${source.detector_index}_${source.function}_${fieldName}_${fieldValue}`;
|
||||
}
|
||||
return alertInstanceKey;
|
||||
};
|
||||
|
||||
/**
|
||||
* Builds a request body
|
||||
* @param params
|
||||
* @param previewTimeInterval
|
||||
* @param params - Alert params
|
||||
* @param previewTimeInterval - Relative time interval to test the alert condition
|
||||
* @param checkIntervalGap - Interval between alert executions
|
||||
*/
|
||||
const fetchAnomalies = async (
|
||||
params: MlAnomalyDetectionAlertParams,
|
||||
previewTimeInterval?: string
|
||||
previewTimeInterval?: string,
|
||||
checkIntervalGap?: Duration
|
||||
): Promise<AlertExecutionResult[] | undefined> => {
|
||||
const jobAndGroupIds = [
|
||||
...(params.jobSelection.jobIds ?? []),
|
||||
|
@ -281,9 +308,14 @@ export function alertingServiceProvider(mlClient: MlClient) {
|
|||
return;
|
||||
}
|
||||
|
||||
const lookBackTimeInterval = resolveTimeInterval(
|
||||
jobsResponse.map((v) => v.analysis_config.bucket_span)
|
||||
);
|
||||
/**
|
||||
* The check interval might be bigger than the 2x bucket span.
|
||||
* We need to check the biggest time range to make sure anomalies are not missed.
|
||||
*/
|
||||
const lookBackTimeInterval = `${Math.max(
|
||||
resolveBucketSpanInSeconds(jobsResponse.map((v) => v.analysis_config.bucket_span)),
|
||||
checkIntervalGap ? checkIntervalGap.asSeconds() : 0
|
||||
)}s`;
|
||||
|
||||
const jobIds = jobsResponse.map((v) => v.job_id);
|
||||
|
||||
|
@ -370,19 +402,22 @@ export function alertingServiceProvider(mlClient: MlClient) {
|
|||
const aggTypeResults = v[resultsLabel.aggGroupLabel];
|
||||
const requestedAnomalies = aggTypeResults[resultsLabel.topHitsLabel].hits.hits;
|
||||
|
||||
const topAnomaly = requestedAnomalies[0];
|
||||
const alertInstanceKey = getAlertInstanceKey(topAnomaly._source);
|
||||
|
||||
return {
|
||||
count: aggTypeResults.doc_count,
|
||||
key: v.key,
|
||||
key_as_string: v.key_as_string,
|
||||
alertInstanceKey,
|
||||
jobIds: [...new Set(requestedAnomalies.map((h) => h._source.job_id))],
|
||||
isInterim: requestedAnomalies.some((h) => h._source.is_interim),
|
||||
timestamp: requestedAnomalies[0]._source.timestamp,
|
||||
timestampIso8601: requestedAnomalies[0].fields.timestamp_iso8601[0],
|
||||
timestampEpoch: requestedAnomalies[0].fields.timestamp_epoch[0],
|
||||
score: requestedAnomalies[0].fields.score[0],
|
||||
timestamp: topAnomaly._source.timestamp,
|
||||
timestampIso8601: topAnomaly.fields.timestamp_iso8601[0],
|
||||
timestampEpoch: topAnomaly.fields.timestamp_epoch[0],
|
||||
score: topAnomaly.fields.score[0],
|
||||
bucketRange: {
|
||||
start: requestedAnomalies[0].fields.start[0],
|
||||
end: requestedAnomalies[0].fields.end[0],
|
||||
start: topAnomaly.fields.start[0],
|
||||
end: topAnomaly.fields.end[0],
|
||||
},
|
||||
topRecords: v.record_results.top_record_hits.hits.hits.map((h) => ({
|
||||
...h._source,
|
||||
|
@ -479,13 +514,24 @@ export function alertingServiceProvider(mlClient: MlClient) {
|
|||
/**
|
||||
* Return the result of an alert condition execution.
|
||||
*
|
||||
* @param params
|
||||
* @param params - Alert params
|
||||
* @param publicBaseUrl
|
||||
* @param alertId - Alert ID
|
||||
* @param startedAt
|
||||
* @param previousStartedAt
|
||||
*/
|
||||
execute: async (
|
||||
params: MlAnomalyDetectionAlertParams,
|
||||
publicBaseUrl: string | undefined
|
||||
publicBaseUrl: string | undefined,
|
||||
alertId: string,
|
||||
startedAt: Date,
|
||||
previousStartedAt: Date | null
|
||||
): Promise<AnomalyDetectionAlertContext | undefined> => {
|
||||
const res = await fetchAnomalies(params);
|
||||
const checkIntervalGap = previousStartedAt
|
||||
? moment.duration(moment(startedAt).diff(previousStartedAt))
|
||||
: undefined;
|
||||
|
||||
const res = await fetchAnomalies(params, undefined, checkIntervalGap);
|
||||
|
||||
if (!res) {
|
||||
throw new Error('No results found');
|
||||
|
@ -496,12 +542,65 @@ export function alertingServiceProvider(mlClient: MlClient) {
|
|||
|
||||
const anomalyExplorerUrl = buildExplorerUrl(result, params.resultType as AnomalyResultType);
|
||||
|
||||
return {
|
||||
const executionResult = {
|
||||
...result,
|
||||
name: result.key_as_string,
|
||||
name: result.alertInstanceKey,
|
||||
anomalyExplorerUrl,
|
||||
kibanaBaseUrl: publicBaseUrl!,
|
||||
};
|
||||
|
||||
let kibanaEventLogCount = 0;
|
||||
try {
|
||||
// Check kibana-event-logs for presence of this alert instance
|
||||
const kibanaLogResults = await esClient.count({
|
||||
index: '.kibana-event-log-*',
|
||||
body: {
|
||||
query: {
|
||||
bool: {
|
||||
must: [
|
||||
{
|
||||
term: {
|
||||
'kibana.alerting.action_group_id': {
|
||||
value: ANOMALY_SCORE_MATCH_GROUP_ID,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
term: {
|
||||
'kibana.alerting.instance_id': {
|
||||
value: executionResult.name,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
nested: {
|
||||
path: 'kibana.saved_objects',
|
||||
query: {
|
||||
term: {
|
||||
'kibana.saved_objects.id': {
|
||||
value: alertId,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
kibanaEventLogCount = kibanaLogResults.body.count;
|
||||
} catch (e) {
|
||||
// eslint-disable-next-line no-console
|
||||
console.log('Unable to check kibana event logs', e);
|
||||
}
|
||||
|
||||
if (kibanaEventLogCount > 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
return executionResult;
|
||||
},
|
||||
/**
|
||||
* Checks how often the alert condition will fire an alert instance
|
||||
|
|
|
@ -123,13 +123,19 @@ export function registerAnomalyDetectionAlertType({
|
|||
},
|
||||
producer: PLUGIN_ID,
|
||||
minimumLicenseRequired: MINIMUM_FULL_LICENSE,
|
||||
async executor({ services, params }) {
|
||||
async executor({ services, params, alertId, state, previousStartedAt, startedAt }) {
|
||||
const fakeRequest = {} as KibanaRequest;
|
||||
const { execute } = mlSharedServices.alertingServiceProvider(
|
||||
services.savedObjectsClient,
|
||||
fakeRequest
|
||||
);
|
||||
const executionResult = await execute(params, publicBaseUrl);
|
||||
const executionResult = await execute(
|
||||
params,
|
||||
publicBaseUrl,
|
||||
alertId,
|
||||
startedAt,
|
||||
previousStartedAt
|
||||
);
|
||||
|
||||
if (executionResult) {
|
||||
const alertInstanceName = executionResult.name;
|
||||
|
|
|
@ -5,12 +5,14 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import { Logger } from 'kibana/server';
|
||||
import { AlertingPlugin } from '../../../../alerts/server';
|
||||
import { registerAnomalyDetectionAlertType } from './register_anomaly_detection_alert_type';
|
||||
import { SharedServices } from '../../shared_services';
|
||||
|
||||
export interface RegisterAlertParams {
|
||||
alerts: AlertingPlugin['setup'];
|
||||
logger: Logger;
|
||||
mlSharedServices: SharedServices;
|
||||
publicBaseUrl: string | undefined;
|
||||
}
|
||||
|
|
|
@ -211,6 +211,7 @@ export class MlServerPlugin
|
|||
if (plugins.alerts) {
|
||||
registerMlAlerts({
|
||||
alerts: plugins.alerts,
|
||||
logger: this.log,
|
||||
mlSharedServices: sharedServices,
|
||||
publicBaseUrl: coreSetup.http.basePath.publicBaseUrl,
|
||||
});
|
||||
|
|
|
@ -30,9 +30,9 @@ export function alertingRoutes({ router, routeGuard }: RouteInitialization) {
|
|||
tags: ['access:ml:canGetJobs'],
|
||||
},
|
||||
},
|
||||
routeGuard.fullLicenseAPIGuard(async ({ mlClient, request, response }) => {
|
||||
routeGuard.fullLicenseAPIGuard(async ({ mlClient, request, response, client }) => {
|
||||
try {
|
||||
const alertingService = alertingServiceProvider(mlClient);
|
||||
const alertingService = alertingServiceProvider(mlClient, client.asInternalUser);
|
||||
|
||||
const result = await alertingService.preview(request.body);
|
||||
|
||||
|
|
|
@ -20,7 +20,9 @@ export function getAlertingServiceProvider(getGuards: GetGuards) {
|
|||
return await getGuards(request, savedObjectsClient)
|
||||
.isFullLicense()
|
||||
.hasMlCapabilities(['canGetJobs'])
|
||||
.ok(({ mlClient }) => alertingServiceProvider(mlClient).preview(...args));
|
||||
.ok(({ mlClient, scopedClient }) =>
|
||||
alertingServiceProvider(mlClient, scopedClient.asInternalUser).preview(...args)
|
||||
);
|
||||
},
|
||||
execute: async (
|
||||
...args: Parameters<MlAlertingService['execute']>
|
||||
|
@ -28,7 +30,9 @@ export function getAlertingServiceProvider(getGuards: GetGuards) {
|
|||
return await getGuards(request, savedObjectsClient)
|
||||
.isFullLicense()
|
||||
.hasMlCapabilities(['canGetJobs'])
|
||||
.ok(({ mlClient }) => alertingServiceProvider(mlClient).execute(...args));
|
||||
.ok(({ mlClient, scopedClient }) =>
|
||||
alertingServiceProvider(mlClient, scopedClient.asInternalUser).execute(...args)
|
||||
);
|
||||
},
|
||||
};
|
||||
},
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue