[ML] Alerts as data integration for Anomaly Detection rule type (#166349)

## Summary

Part of #165958

Replaces usage of the deprecated `alertFactory` with the new alerts
client and adds alerts-as-data integration for Anomaly Detection
alerting rule type.

Alert instances are stored in
`.alerts-ml.anomaly-detection.alerts-default` index and extends the
common `AlertSchema`.

<details>
  <summary>Result mappings</summary>
  
  ```json
{
  ".internal.alerts-ml.anomaly-detection.alerts-default-000001": {
    "mappings": {
      "dynamic": "false",
      "_meta": {
        "namespace": "default",
        "kibana": {
          "version": "8.11.0"
        },
        "managed": true
      },
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "event": {
          "properties": {
            "action": {
              "type": "keyword"
            },
            "kind": {
              "type": "keyword"
            }
          }
        },
        "kibana": {
          "properties": {
            "alert": {
              "properties": {
                "action_group": {
                  "type": "keyword"
                },
                "anomaly_score": {
                  "type": "double"
                },
                "anomaly_timestamp": {
                  "type": "date"
                },
                "case_ids": {
                  "type": "keyword"
                },
                "duration": {
                  "properties": {
                    "us": {
                      "type": "long"
                    }
                  }
                },
                "end": {
                  "type": "date"
                },
                "flapping": {
                  "type": "boolean"
                },
                "flapping_history": {
                  "type": "boolean"
                },
                "instance": {
                  "properties": {
                    "id": {
                      "type": "keyword"
                    }
                  }
                },
                "is_interim": {
                  "type": "boolean"
                },
                "job_id": {
                  "type": "keyword"
                },
                "last_detected": {
                  "type": "date"
                },
                "maintenance_window_ids": {
                  "type": "keyword"
                },
                "reason": {
                  "type": "keyword"
                },
                "rule": {
                  "properties": {
                    "category": {
                      "type": "keyword"
                    },
                    "consumer": {
                      "type": "keyword"
                    },
                    "execution": {
                      "properties": {
                        "uuid": {
                          "type": "keyword"
                        }
                      }
                    },
                    "name": {
                      "type": "keyword"
                    },
                    "parameters": {
                      "type": "flattened",
                      "ignore_above": 4096
                    },
                    "producer": {
                      "type": "keyword"
                    },
                    "revision": {
                      "type": "long"
                    },
                    "rule_type_id": {
                      "type": "keyword"
                    },
                    "tags": {
                      "type": "keyword"
                    },
                    "uuid": {
                      "type": "keyword"
                    }
                  }
                },
                "start": {
                  "type": "date"
                },
                "status": {
                  "type": "keyword"
                },
                "time_range": {
                  "type": "date_range",
                  "format": "epoch_millis||strict_date_optional_time"
                },
                "top_influencers": {
                  "type": "nested",
                  "dynamic": "false",
                  "properties": {
                    "influencer_field_name": {
                      "type": "keyword"
                    },
                    "influencer_field_value": {
                      "type": "keyword"
                    },
                    "influencer_score": {
                      "type": "double"
                    },
                    "initial_influencer_score": {
                      "type": "double"
                    },
                    "is_interim": {
                      "type": "boolean"
                    },
                    "job_id": {
                      "type": "keyword"
                    },
                    "timestamp": {
                      "type": "date"
                    }
                  }
                },
                "top_records": {
                  "type": "nested",
                  "dynamic": "false",
                  "properties": {
                    "actual": {
                      "type": "double"
                    },
                    "by_field_name": {
                      "type": "keyword"
                    },
                    "by_field_value": {
                      "type": "keyword"
                    },
                    "detector_index": {
                      "type": "integer"
                    },
                    "field_name": {
                      "type": "keyword"
                    },
                    "function": {
                      "type": "keyword"
                    },
                    "initial_record_score": {
                      "type": "double"
                    },
                    "is_interim": {
                      "type": "boolean"
                    },
                    "job_id": {
                      "type": "keyword"
                    },
                    "over_field_name": {
                      "type": "keyword"
                    },
                    "over_field_value": {
                      "type": "keyword"
                    },
                    "partition_field_name": {
                      "type": "keyword"
                    },
                    "partition_field_value": {
                      "type": "keyword"
                    },
                    "record_score": {
                      "type": "double"
                    },
                    "timestamp": {
                      "type": "date"
                    },
                    "typical": {
                      "type": "double"
                    }
                  }
                },
                "url": {
                  "type": "keyword",
                  "index": false,
                  "ignore_above": 2048
                },
                "uuid": {
                  "type": "keyword"
                },
                "workflow_status": {
                  "type": "keyword"
                },
                "workflow_tags": {
                  "type": "keyword"
                }
              }
            },
            "space_ids": {
              "type": "keyword"
            },
            "version": {
              "type": "version"
            }
          }
        },
        "tags": {
          "type": "keyword"
        }
      }
    }
  }
}
  ```
</details>

### Checklist

- [ ]
[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)
was added for features that require explanation or tutorials
- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
This commit is contained in:
Dima Arnautov 2023-09-28 15:46:42 +02:00 committed by GitHub
parent 67cc63affe
commit 3ad5addd89
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 512 additions and 64 deletions

View file

@ -34,6 +34,7 @@ export interface EcsMetadata {
scaling_factor?: number;
short: string;
type: string;
properties?: Record<string, { type: string }>;
}
export interface FieldMap {
@ -50,5 +51,6 @@ export interface FieldMap {
path?: string;
scaling_factor?: number;
dynamic?: boolean | 'strict';
properties?: Record<string, { type: string }>;
};
}

View file

@ -198,6 +198,7 @@ const generateSchemaLines = ({
break;
case 'float':
case 'integer':
case 'double':
lineWriter.addLine(`${keyToWrite}: ${getSchemaDefinition('schemaNumber', isArray)},`);
break;
case 'boolean':

View file

@ -0,0 +1,120 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
// ---------------------------------- WARNING ----------------------------------
// this file was generated, and should not be edited by hand
// ---------------------------------- WARNING ----------------------------------
import * as rt from 'io-ts';
import { Either } from 'fp-ts/lib/Either';
import { AlertSchema } from './alert_schema';
const ISO_DATE_PATTERN = /^d{4}-d{2}-d{2}Td{2}:d{2}:d{2}.d{3}Z$/;
export const IsoDateString = new rt.Type<string, string, unknown>(
'IsoDateString',
rt.string.is,
(input, context): Either<rt.Errors, string> => {
if (typeof input === 'string' && ISO_DATE_PATTERN.test(input)) {
return rt.success(input);
} else {
return rt.failure(input, context);
}
},
rt.identity
);
export type IsoDateStringC = typeof IsoDateString;
export const schemaDate = IsoDateString;
export const schemaDateArray = rt.array(IsoDateString);
export const schemaDateRange = rt.partial({
gte: schemaDate,
lte: schemaDate,
});
export const schemaDateRangeArray = rt.array(schemaDateRange);
export const schemaUnknown = rt.unknown;
export const schemaUnknownArray = rt.array(rt.unknown);
export const schemaString = rt.string;
export const schemaStringArray = rt.array(schemaString);
export const schemaNumber = rt.number;
export const schemaNumberArray = rt.array(schemaNumber);
export const schemaStringOrNumber = rt.union([schemaString, schemaNumber]);
export const schemaStringOrNumberArray = rt.array(schemaStringOrNumber);
export const schemaBoolean = rt.boolean;
export const schemaBooleanArray = rt.array(schemaBoolean);
const schemaGeoPointCoords = rt.type({
type: schemaString,
coordinates: schemaNumberArray,
});
const schemaGeoPointString = schemaString;
const schemaGeoPointLatLon = rt.type({
lat: schemaNumber,
lon: schemaNumber,
});
const schemaGeoPointLocation = rt.type({
location: schemaNumberArray,
});
const schemaGeoPointLocationString = rt.type({
location: schemaString,
});
export const schemaGeoPoint = rt.union([
schemaGeoPointCoords,
schemaGeoPointString,
schemaGeoPointLatLon,
schemaGeoPointLocation,
schemaGeoPointLocationString,
]);
export const schemaGeoPointArray = rt.array(schemaGeoPoint);
// prettier-ignore
const MlAnomalyDetectionAlertRequired = rt.type({
kibana: rt.type({
alert: rt.type({
job_id: schemaString,
}),
}),
});
const MlAnomalyDetectionAlertOptional = rt.partial({
kibana: rt.partial({
alert: rt.partial({
anomaly_score: schemaNumber,
anomaly_timestamp: schemaDate,
is_interim: schemaBoolean,
top_influencers: rt.array(
rt.partial({
influencer_field_name: schemaString,
influencer_field_value: schemaString,
influencer_score: schemaNumber,
initial_influencer_score: schemaNumber,
is_interim: schemaBoolean,
job_id: schemaString,
timestamp: schemaDate,
})
),
top_records: rt.array(
rt.partial({
actual: schemaNumber,
by_field_name: schemaString,
by_field_value: schemaString,
detector_index: schemaNumber,
field_name: schemaString,
function: schemaString,
initial_record_score: schemaNumber,
is_interim: schemaBoolean,
job_id: schemaString,
over_field_name: schemaString,
over_field_value: schemaString,
partition_field_name: schemaString,
partition_field_value: schemaString,
record_score: schemaNumber,
timestamp: schemaDate,
typical: schemaNumber,
})
),
}),
}),
});
// prettier-ignore
export const MlAnomalyDetectionAlertSchema = rt.intersection([MlAnomalyDetectionAlertRequired, MlAnomalyDetectionAlertOptional, AlertSchema]);
// prettier-ignore
export type MlAnomalyDetectionAlert = rt.TypeOf<typeof MlAnomalyDetectionAlertSchema>;

View file

@ -13,6 +13,7 @@ import type { ObservabilityMetricsAlert } from './generated/observability_metric
import type { ObservabilitySloAlert } from './generated/observability_slo_schema';
import type { ObservabilityUptimeAlert } from './generated/observability_uptime_schema';
import type { SecurityAlert } from './generated/security_schema';
import type { MlAnomalyDetectionAlert } from './generated/ml_anomaly_detection_schema';
export * from './create_schema_from_field_map';
@ -24,6 +25,7 @@ export type { ObservabilitySloAlert } from './generated/observability_slo_schema
export type { ObservabilityUptimeAlert } from './generated/observability_uptime_schema';
export type { SecurityAlert } from './generated/security_schema';
export type { StackAlert } from './generated/stack_schema';
export type { MlAnomalyDetectionAlert } from './generated/ml_anomaly_detection_schema';
export type AADAlert =
| Alert
@ -32,4 +34,5 @@ export type AADAlert =
| ObservabilityMetricsAlert
| ObservabilitySloAlert
| ObservabilityUptimeAlert
| SecurityAlert;
| SecurityAlert
| MlAnomalyDetectionAlert;

View file

@ -9,7 +9,7 @@ import { capitalize } from 'lodash';
export const contextToSchemaName = (context: string) => {
return `${context
.split('.')
.split(/[.\-]/)
.map((part: string) => capitalize(part))
.join('')}Alert`;
};

View file

@ -44,6 +44,35 @@ interface BaseAnomalyAlertDoc {
unique_key: string;
}
export interface TopRecordAADDoc {
job_id: string;
record_score: number;
initial_record_score: number;
timestamp: number;
is_interim: boolean;
function: string;
field_name?: string;
by_field_name?: string;
by_field_value?: string | number;
over_field_name?: string;
over_field_value?: string | number;
partition_field_name?: string;
partition_field_value?: string | number;
typical: number[];
actual: number[];
detector_index: number;
}
export interface TopInfluencerAADDoc {
job_id: string;
influencer_score: number;
initial_influencer_score: number;
is_interim: boolean;
timestamp: number;
influencer_field_name: string;
influencer_field_value: string | number;
}
export interface RecordAnomalyAlertDoc extends BaseAnomalyAlertDoc {
result_type: typeof ML_ANOMALY_RESULT_TYPE.RECORD;
function: string;

View file

@ -9,7 +9,7 @@ import Boom from '@hapi/boom';
import { i18n } from '@kbn/i18n';
import rison from '@kbn/rison';
import type { Duration } from 'moment/moment';
import { memoize } from 'lodash';
import { memoize, pick } from 'lodash';
import {
FIELD_FORMAT_IDS,
type IFieldFormat,
@ -24,6 +24,7 @@ import {
ML_ANOMALY_RESULT_TYPE,
} from '@kbn/ml-anomaly-utils';
import { DEFAULT_SPACE_ID } from '@kbn/spaces-plugin/common';
import { ALERT_REASON, ALERT_URL } from '@kbn/rule-data-utils';
import type { MlClient } from '../ml_client';
import type {
MlAnomalyDetectionAlertParams,
@ -36,8 +37,13 @@ import type {
PreviewResultsKeys,
RecordAnomalyAlertDoc,
TopHitsResultsKeys,
TopInfluencerAADDoc,
TopRecordAADDoc,
} from '../../../common/types/alerts';
import type { AnomalyDetectionAlertContext } from './register_anomaly_detection_alert_type';
import type {
AnomalyDetectionAlertContext,
AnomalyDetectionAlertPayload,
} from './register_anomaly_detection_alert_type';
import { resolveMaxTimeInterval } from '../../../common/util/job_utils';
import { getTopNBuckets, resolveLookbackInterval } from '../../../common/util/alerts';
import type { DatafeedsService } from '../../models/job_service/datafeeds';
@ -391,12 +397,89 @@ export function alertingServiceProvider(
return alertInstanceKey;
};
/**
* Returns a callback for formatting elasticsearch aggregation response
* to the alert-as-data document.
* @param resultType
*/
const getResultsToPayloadFormatter = (
resultType: MlAnomalyResultType,
useInitialScore: boolean = false
) => {
const resultsLabel = getAggResultsLabel(resultType);
return (
v: AggResultsResponse
): Omit<AnomalyDetectionAlertPayload, typeof ALERT_URL> | undefined => {
const aggTypeResults = v[resultsLabel.aggGroupLabel];
if (aggTypeResults.doc_count === 0) {
return;
}
const requestedAnomalies = aggTypeResults[resultsLabel.topHitsLabel].hits.hits;
const topAnomaly = requestedAnomalies[0];
const timestamp = topAnomaly._source.timestamp;
return {
[ALERT_REASON]: i18n.translate(
'xpack.ml.alertTypes.anomalyDetectionAlertingRule.alertMessage',
{
defaultMessage:
'Alerts are raised based on real-time scores. Remember that scores may be adjusted over time as data continues to be analyzed.',
}
),
job_id: [...new Set(requestedAnomalies.map((h) => h._source.job_id))][0],
is_interim: requestedAnomalies.some((h) => h._source.is_interim),
anomaly_timestamp: timestamp,
anomaly_score: topAnomaly._source[getScoreFields(resultType, useInitialScore)],
top_records: v.record_results.top_record_hits.hits.hits.map((h) => {
const { actual, typical } = getTypicalAndActualValues(h._source);
return pick<RecordAnomalyAlertDoc>(
{
...h._source,
typical,
actual,
},
[
'job_id',
'record_score',
'initial_record_score',
'detector_index',
'is_interim',
'timestamp',
'partition_field_name',
'partition_field_value',
'function',
'actual',
'typical',
]
) as TopRecordAADDoc;
}) as TopRecordAADDoc[],
top_influencers: v.influencer_results.top_influencer_hits.hits.hits.map((influencerDoc) => {
return pick<InfluencerAnomalyAlertDoc>(
{
...influencerDoc._source,
},
[
'job_id',
'influencer_field_name',
'influencer_field_value',
'influencer_score',
'initial_influencer_score',
'is_interim',
'timestamp',
]
) as TopInfluencerAADDoc;
}) as TopInfluencerAADDoc[],
};
};
};
/**
* Returns a callback for formatting elasticsearch aggregation response
* to the alert context.
* @param resultType
*/
const getResultsFormatter = (
const getResultsToContextFormatter = (
resultType: MlAnomalyResultType,
useInitialScore: boolean = false,
formatters: FieldFormatters
@ -468,7 +551,7 @@ export function alertingServiceProvider(
* @param previewTimeInterval - Relative time interval to test the alert condition
* @param checkIntervalGap - Interval between alert executions
*/
const fetchAnomalies = async (
const fetchPreviewResults = async (
params: MlAnomalyDetectionAlertParams,
previewTimeInterval?: string,
checkIntervalGap?: Duration
@ -570,7 +653,7 @@ export function alertingServiceProvider(
const fieldsFormatters = await getFormatters(datafeeds![0]!.indices[0]);
const formatter = getResultsFormatter(
const formatter = getResultsToContextFormatter(
params.resultType,
!!previewTimeInterval,
fieldsFormatters
@ -660,7 +743,7 @@ export function alertingServiceProvider(
*/
const fetchResult = async (
params: AnomalyESQueryParams
): Promise<AlertExecutionResult | undefined> => {
): Promise<AggResultsResponse | undefined> => {
const {
resultType,
jobIds,
@ -670,7 +753,6 @@ export function alertingServiceProvider(
anomalyScoreField,
includeInterimResults,
anomalyScoreThreshold,
indexPattern,
} = params;
const requestBody = {
@ -761,9 +843,44 @@ export function alertingServiceProvider(
prev.max_score.value > current.max_score.value ? prev : current
);
return topResult;
};
const getFormatted = async (
indexPattern: string,
resultType: MlAnomalyDetectionAlertParams['resultType'],
spaceId: string,
value: AggResultsResponse
): Promise<
| { payload: AnomalyDetectionAlertPayload; context: AnomalyDetectionAlertContext; name: string }
| undefined
> => {
const formatters = await getFormatters(indexPattern);
return getResultsFormatter(params.resultType, false, formatters)(topResult);
const context = getResultsToContextFormatter(resultType, false, formatters)(value);
const payload = getResultsToPayloadFormatter(resultType, false)(value);
if (!context || !payload) return;
const anomalyExplorerUrl = buildExplorerUrl(
context.jobIds,
{ from: context.bucketRange.start, to: context.bucketRange.end },
resultType,
spaceId,
context
);
return {
payload: {
...payload,
[ALERT_URL]: anomalyExplorerUrl,
},
context: {
...context,
anomalyExplorerUrl,
},
name: context.alertInstanceKey,
};
};
return {
@ -777,7 +894,13 @@ export function alertingServiceProvider(
params: MlAnomalyDetectionAlertParams,
spaceId: string
): Promise<
{ context: AnomalyDetectionAlertContext; name: string; isHealthy: boolean } | undefined
| {
payload: AnomalyDetectionAlertPayload;
context: AnomalyDetectionAlertContext;
name: string;
isHealthy: boolean;
}
| undefined
> => {
const queryParams = await getQueryParams(params);
@ -787,50 +910,57 @@ export function alertingServiceProvider(
const result = await fetchResult(queryParams);
if (result) {
const anomalyExplorerUrl = buildExplorerUrl(
result.jobIds,
{ from: result.bucketRange.start, to: result.bucketRange.end },
params.resultType,
spaceId,
result
const formattedResult = result
? await getFormatted(queryParams.indexPattern, queryParams.resultType, spaceId, result)
: undefined;
if (!formattedResult) {
// If no anomalies found, report as recovered.
const url = buildExplorerUrl(
queryParams.jobIds,
{
from: `now-${queryParams.lookBackTimeInterval}`,
to: 'now',
mode: 'relative',
},
queryParams.resultType,
spaceId
);
const executionResult = {
...result,
anomalyExplorerUrl,
};
const message = i18n.translate(
'xpack.ml.alertTypes.anomalyDetectionAlertingRule.recoveredMessage',
{
defaultMessage:
'No anomalies have been found in the past {lookbackInterval} that exceed the severity threshold of {severity}.',
values: {
severity: queryParams.anomalyScoreThreshold,
lookbackInterval: queryParams.lookBackTimeInterval,
},
}
);
return { context: executionResult, name: result.alertInstanceKey, isHealthy: false };
return {
name: '',
isHealthy: true,
payload: {
[ALERT_URL]: url,
[ALERT_REASON]: message,
job_id: queryParams.jobIds[0],
},
context: {
anomalyExplorerUrl: url,
jobIds: queryParams.jobIds,
message,
} as AnomalyDetectionAlertContext,
};
}
return {
name: '',
isHealthy: true,
context: {
anomalyExplorerUrl: buildExplorerUrl(
queryParams.jobIds,
{
from: `now-${queryParams.lookBackTimeInterval}`,
to: 'now',
mode: 'relative',
},
queryParams.resultType,
spaceId
),
jobIds: queryParams.jobIds,
message: i18n.translate(
'xpack.ml.alertTypes.anomalyDetectionAlertingRule.recoveredMessage',
{
defaultMessage:
'No anomalies have been found in the past {lookbackInterval} that exceed the severity threshold of {severity}.',
values: {
severity: queryParams.anomalyScoreThreshold,
lookbackInterval: queryParams.lookBackTimeInterval,
},
}
),
} as AnomalyDetectionAlertContext,
context: formattedResult.context,
payload: formattedResult.payload,
name: formattedResult.name,
isHealthy: false,
};
},
/**
@ -844,16 +974,16 @@ export function alertingServiceProvider(
timeRange,
sampleSize,
}: MlAnomalyDetectionAlertPreviewRequest): Promise<PreviewResponse> => {
const res = await fetchAnomalies(alertParams, timeRange);
const previewResults = await fetchPreviewResults(alertParams, timeRange);
if (!res) {
if (!previewResults) {
throw Boom.notFound(`No results found`);
}
return {
// sum of all alert responses within the time range
count: res.length,
results: res.slice(0, sampleSize),
count: previewResults.length,
results: previewResults.slice(0, sampleSize),
};
},
};

View file

@ -11,8 +11,15 @@ import type {
ActionGroup,
AlertInstanceContext,
AlertInstanceState,
RuleTypeParams,
RuleTypeState,
RecoveredActionGroupId,
} from '@kbn/alerting-plugin/common';
import { IRuleTypeAlerts, RuleExecutorOptions } from '@kbn/alerting-plugin/server';
import { ALERT_NAMESPACE, ALERT_REASON, ALERT_URL } from '@kbn/rule-data-utils';
import { MlAnomalyDetectionAlert } from '@kbn/alerts-as-data-utils';
import { ES_FIELD_TYPES } from '@kbn/field-types';
import { expandFlattenedAlert } from '@kbn/alerting-plugin/server/alerts_client/lib';
import { ML_ALERT_TYPES } from '../../../common/constants/alerts';
import { PLUGIN_ID } from '../../../common/constants/app';
import { MINIMUM_FULL_LICENSE } from '../../../common/license';
@ -36,6 +43,19 @@ export type AnomalyDetectionAlertBaseContext = AlertInstanceContext & {
message: string;
};
// Flattened alert payload for alert-as-data
export type AnomalyDetectionAlertPayload = {
job_id: string;
anomaly_score?: number;
is_interim?: boolean;
anomaly_timestamp?: number;
top_records?: any;
top_influencers?: any;
} & {
[ALERT_URL]: string;
[ALERT_REASON]: string;
};
export type AnomalyDetectionAlertContext = AnomalyDetectionAlertBaseContext & {
timestampIso8601: string;
timestamp: number;
@ -45,10 +65,88 @@ export type AnomalyDetectionAlertContext = AnomalyDetectionAlertBaseContext & {
topInfluencers?: InfluencerAnomalyAlertDoc[];
};
export type ExecutorOptions<P extends RuleTypeParams> = RuleExecutorOptions<
P,
RuleTypeState,
{},
AnomalyDetectionAlertContext,
typeof ANOMALY_SCORE_MATCH_GROUP_ID,
MlAnomalyDetectionAlert
>;
export const ANOMALY_SCORE_MATCH_GROUP_ID = 'anomaly_score_match';
export type AnomalyScoreMatchGroupId = typeof ANOMALY_SCORE_MATCH_GROUP_ID;
export const ANOMALY_DETECTION_AAD_INDEX_NAME = 'ml.anomaly-detection';
const ML_ALERT_NAMESPACE = ALERT_NAMESPACE;
export const ALERT_ANOMALY_DETECTION_JOB_ID = `${ML_ALERT_NAMESPACE}.job_id` as const;
export const ALERT_ANOMALY_SCORE = `${ML_ALERT_NAMESPACE}.anomaly_score` as const;
export const ALERT_ANOMALY_IS_INTERIM = `${ML_ALERT_NAMESPACE}.is_interim` as const;
export const ALERT_ANOMALY_TIMESTAMP = `${ML_ALERT_NAMESPACE}.anomaly_timestamp` as const;
export const ALERT_TOP_RECORDS = `${ML_ALERT_NAMESPACE}.top_records` as const;
export const ALERT_TOP_INFLUENCERS = `${ML_ALERT_NAMESPACE}.top_influencers` as const;
export const ANOMALY_DETECTION_AAD_CONFIG: IRuleTypeAlerts<MlAnomalyDetectionAlert> = {
context: ANOMALY_DETECTION_AAD_INDEX_NAME,
mappings: {
fieldMap: {
[ALERT_ANOMALY_DETECTION_JOB_ID]: {
type: ES_FIELD_TYPES.KEYWORD,
array: false,
required: true,
},
[ALERT_ANOMALY_SCORE]: { type: ES_FIELD_TYPES.DOUBLE, array: false, required: false },
[ALERT_ANOMALY_IS_INTERIM]: { type: ES_FIELD_TYPES.BOOLEAN, array: false, required: false },
[ALERT_ANOMALY_TIMESTAMP]: { type: ES_FIELD_TYPES.DATE, array: false, required: false },
[ALERT_TOP_RECORDS]: {
type: ES_FIELD_TYPES.OBJECT,
array: true,
required: false,
dynamic: false,
properties: {
job_id: { type: ES_FIELD_TYPES.KEYWORD },
record_score: { type: ES_FIELD_TYPES.DOUBLE },
initial_record_score: { type: ES_FIELD_TYPES.DOUBLE },
detector_index: { type: ES_FIELD_TYPES.INTEGER },
is_interim: { type: ES_FIELD_TYPES.BOOLEAN },
timestamp: { type: ES_FIELD_TYPES.DATE },
partition_field_name: { type: ES_FIELD_TYPES.KEYWORD },
partition_field_value: { type: ES_FIELD_TYPES.KEYWORD },
over_field_name: { type: ES_FIELD_TYPES.KEYWORD },
over_field_value: { type: ES_FIELD_TYPES.KEYWORD },
by_field_name: { type: ES_FIELD_TYPES.KEYWORD },
by_field_value: { type: ES_FIELD_TYPES.KEYWORD },
function: { type: ES_FIELD_TYPES.KEYWORD },
typical: { type: ES_FIELD_TYPES.DOUBLE },
actual: { type: ES_FIELD_TYPES.DOUBLE },
field_name: { type: ES_FIELD_TYPES.KEYWORD },
},
},
[ALERT_TOP_INFLUENCERS]: {
type: ES_FIELD_TYPES.OBJECT,
array: true,
required: false,
dynamic: false,
properties: {
job_id: { type: ES_FIELD_TYPES.KEYWORD },
influencer_field_name: { type: ES_FIELD_TYPES.KEYWORD },
influencer_field_value: { type: ES_FIELD_TYPES.KEYWORD },
influencer_score: { type: ES_FIELD_TYPES.DOUBLE },
initial_influencer_score: { type: ES_FIELD_TYPES.DOUBLE },
is_interim: { type: ES_FIELD_TYPES.BOOLEAN },
timestamp: { type: ES_FIELD_TYPES.DATE },
},
},
},
},
shouldWrite: true,
};
export const THRESHOLD_MET_GROUP: ActionGroup<AnomalyScoreMatchGroupId> = {
id: ANOMALY_SCORE_MATCH_GROUP_ID,
name: i18n.translate('xpack.ml.anomalyDetectionAlert.actionGroupName', {
@ -66,7 +164,9 @@ export function registerAnomalyDetectionAlertType({
RuleTypeState,
AlertInstanceState,
AnomalyDetectionAlertContext,
AnomalyScoreMatchGroupId
AnomalyScoreMatchGroupId,
RecoveredActionGroupId,
MlAnomalyDetectionAlert
>({
id: ML_ALERT_TYPES.ANOMALY_DETECTION,
name: i18n.translate('xpack.ml.anomalyDetectionAlert.name', {
@ -140,29 +240,62 @@ export function registerAnomalyDetectionAlertType({
minimumLicenseRequired: MINIMUM_FULL_LICENSE,
isExportable: true,
doesSetRecoveryContext: true,
async executor({ services, params, spaceId }) {
executor: async ({
services,
params,
spaceId,
}: ExecutorOptions<MlAnomalyDetectionAlertParams>) => {
const fakeRequest = {} as KibanaRequest;
const { execute } = mlSharedServices.alertingServiceProvider(
services.savedObjectsClient,
fakeRequest
);
const { alertsClient } = services;
if (!alertsClient) return { state: {} };
const executionResult = await execute(params, spaceId);
if (executionResult && !executionResult.isHealthy) {
const alertInstanceName = executionResult.name;
const alertInstance = services.alertFactory.create(alertInstanceName);
alertInstance.scheduleActions(ANOMALY_SCORE_MATCH_GROUP_ID, executionResult.context);
if (!executionResult) return { state: {} };
const { isHealthy, name, context, payload } = executionResult;
if (!isHealthy) {
alertsClient.report({
id: name,
actionGroup: ANOMALY_SCORE_MATCH_GROUP_ID,
context,
payload: expandFlattenedAlert({
[ALERT_URL]: payload[ALERT_URL],
[ALERT_REASON]: payload[ALERT_REASON],
[ALERT_ANOMALY_DETECTION_JOB_ID]: payload.job_id,
[ALERT_ANOMALY_SCORE]: payload.anomaly_score,
[ALERT_ANOMALY_IS_INTERIM]: payload.is_interim,
[ALERT_ANOMALY_TIMESTAMP]: payload.anomaly_timestamp,
[ALERT_TOP_RECORDS]: payload.top_records,
[ALERT_TOP_INFLUENCERS]: payload.top_influencers,
}),
});
}
// Set context for recovered alerts
const { getRecoveredAlerts } = services.alertFactory.done();
for (const recoveredAlert of getRecoveredAlerts()) {
if (!!executionResult?.isHealthy) {
recoveredAlert.setContext(executionResult.context);
for (const recoveredAlert of alertsClient.getRecoveredAlerts()) {
if (isHealthy) {
const alertId = recoveredAlert.alert.getId();
alertsClient.setAlertData({
id: alertId,
context,
payload: expandFlattenedAlert({
[ALERT_URL]: payload[ALERT_URL],
[ALERT_REASON]: payload[ALERT_REASON],
[ALERT_ANOMALY_DETECTION_JOB_ID]: payload.job_id,
}),
});
}
}
return { state: {} };
},
alerts: ANOMALY_DETECTION_AAD_CONFIG,
});
}

View file

@ -106,5 +106,7 @@
"@kbn/react-kibana-mount",
"@kbn/core-http-browser",
"@kbn/data-view-editor-plugin",
"@kbn/rule-data-utils",
"@kbn/alerts-as-data-utils",
],
}

View file

@ -23,6 +23,8 @@ const ES_TEST_INDEX_SOURCE = 'ml-alert:anomaly-detection';
const ES_TEST_INDEX_REFERENCE = '-na-';
const ES_TEST_OUTPUT_INDEX_NAME = `${ES_TEST_INDEX_NAME}-ad-alert-output`;
const AAD_INDEX = '.alerts-ml.anomaly-detection.alerts-default';
const ALERT_INTERVAL_SECONDS = 3;
const AD_JOB_ID = 'rt-anomaly-mean-value';
@ -144,6 +146,18 @@ export default function alertTests({ getService }: FtrProviderContext) {
'/s/space1/app/ml/explorer/?_g=(ml%3A(jobIds%3A!(rt-anomaly-mean-value))'
);
}
log.debug('Checking docs in the alerts-as-data index...');
const aadDocs = await waitForAAD(1);
for (const doc of aadDocs) {
const { job_id: jobId, url } = doc._source.kibana.alert;
expect(jobId).to.be(AD_JOB_ID);
expect(url).to.contain(
'/s/space1/app/ml/explorer/?_g=(ml%3A(jobIds%3A!(rt-anomaly-mean-value))'
);
}
});
async function waitForDocs(count: number): Promise<any[]> {
@ -154,6 +168,20 @@ export default function alertTests({ getService }: FtrProviderContext) {
);
}
async function waitForAAD(numDocs: number): Promise<any[]> {
return await retry.try(async () => {
const searchResult = await es.search({ index: AAD_INDEX, size: 1000 });
// @ts-expect-error doesn't handle total: number
const value = searchResult.hits.total.value?.value || searchResult.hits.total.value;
if (value < numDocs) {
// @ts-expect-error doesn't handle total: number
throw new Error(`Expected ${numDocs} but received ${searchResult.hits.total.value}.`);
}
return searchResult.hits.hits;
});
}
async function createAlert({
name,
...params

View file

@ -71,7 +71,7 @@ export default function checkAlertSchemasTest({ getService }: FtrProviderContext
createSchemaFromFieldMap({
outputFile: `schemas/generated/${alertsDefinition.context.replaceAll(
'.',
/[.\-]/g,
'_'
)}_schema.ts`,
fieldMap: alertsDefinition.mappings.fieldMap,