[7.x] [Metrics UI] Add preview feature for metric threshold alerts (#67684) (#69183)

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Zacqary Adam Xeper 2020-06-16 13:26:13 -05:00 committed by GitHub
parent 70d071b31f
commit d97841416c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 985 additions and 309 deletions

View file

@ -0,0 +1,18 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export * from './types';
export const INFRA_ALERT_PREVIEW_PATH = '/api/infra/alerting/preview';
export const TOO_MANY_BUCKETS_PREVIEW_EXCEPTION = 'TOO_MANY_BUCKETS_PREVIEW_EXCEPTION';
export interface TooManyBucketsPreviewExceptionMetadata {
TOO_MANY_BUCKETS_PREVIEW_EXCEPTION: any;
maxBuckets: number;
}
export const isTooManyBucketsPreviewException = (
value: any
): value is TooManyBucketsPreviewExceptionMetadata =>
Boolean(value && value.TOO_MANY_BUCKETS_PREVIEW_EXCEPTION);

View file

@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import * as rt from 'io-ts';
// TODO: Have threshold and inventory alerts import these types from this file instead of from their
// local directories
export const METRIC_THRESHOLD_ALERT_TYPE_ID = 'metrics.alert.threshold';
export const METRIC_INVENTORY_THRESHOLD_ALERT_TYPE_ID = 'metrics.alert.inventory.threshold';
export enum Comparator {
GT = '>',
LT = '<',
GT_OR_EQ = '>=',
LT_OR_EQ = '<=',
BETWEEN = 'between',
OUTSIDE_RANGE = 'outside',
}
export enum Aggregators {
COUNT = 'count',
AVERAGE = 'avg',
SUM = 'sum',
MIN = 'min',
MAX = 'max',
RATE = 'rate',
CARDINALITY = 'cardinality',
P95 = 'p95',
P99 = 'p99',
}
// Alert Preview API
const baseAlertRequestParamsRT = rt.intersection([
rt.partial({
filterQuery: rt.union([rt.string, rt.undefined]),
sourceId: rt.string,
}),
rt.type({
lookback: rt.union([rt.literal('h'), rt.literal('d'), rt.literal('w'), rt.literal('M')]),
criteria: rt.array(rt.any),
alertInterval: rt.string,
}),
]);
const metricThresholdAlertPreviewRequestParamsRT = rt.intersection([
baseAlertRequestParamsRT,
rt.partial({
groupBy: rt.union([rt.string, rt.array(rt.string), rt.undefined]),
}),
rt.type({
alertType: rt.literal(METRIC_THRESHOLD_ALERT_TYPE_ID),
}),
]);
export type MetricThresholdAlertPreviewRequestParams = rt.TypeOf<
typeof metricThresholdAlertPreviewRequestParamsRT
>;
const inventoryAlertPreviewRequestParamsRT = rt.intersection([
baseAlertRequestParamsRT,
rt.type({
nodeType: rt.string,
alertType: rt.literal(METRIC_INVENTORY_THRESHOLD_ALERT_TYPE_ID),
}),
]);
export const alertPreviewRequestParamsRT = rt.union([
metricThresholdAlertPreviewRequestParamsRT,
inventoryAlertPreviewRequestParamsRT,
]);
export const alertPreviewSuccessResponsePayloadRT = rt.type({
numberOfGroups: rt.number,
resultTotals: rt.type({
fired: rt.number,
noData: rt.number,
error: rt.number,
tooManyBuckets: rt.number,
}),
});

View file

@ -4,25 +4,36 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { debounce } from 'lodash';
import { debounce, pick } from 'lodash';
import * as rt from 'io-ts';
import { HttpSetup } from 'src/core/public';
import React, { ChangeEvent, useCallback, useMemo, useEffect, useState } from 'react';
import {
EuiSpacer,
EuiText,
EuiFormRow,
EuiButton,
EuiButtonEmpty,
EuiCheckbox,
EuiToolTip,
EuiIcon,
EuiFieldSearch,
EuiSelect,
EuiFlexGroup,
EuiFlexItem,
} from '@elastic/eui';
import { FormattedMessage } from '@kbn/i18n/react';
import { i18n } from '@kbn/i18n';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { getIntervalInSeconds } from '../../../../server/utils/get_interval_in_seconds';
import {
Comparator,
Aggregators,
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
} from '../../../../server/lib/alerting/metric_threshold/types';
INFRA_ALERT_PREVIEW_PATH,
alertPreviewRequestParamsRT,
alertPreviewSuccessResponsePayloadRT,
METRIC_THRESHOLD_ALERT_TYPE_ID,
} from '../../../../common/alerting/metrics';
import {
ForLastExpression,
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
@ -40,6 +51,7 @@ import { convertKueryToElasticSearchQuery } from '../../../utils/kuery';
import { ExpressionRow } from './expression_row';
import { AlertContextMeta, TimeUnit, MetricExpression } from '../types';
import { ExpressionChart } from './expression_chart';
import { validateMetricThreshold } from './validation';
const FILTER_TYPING_DEBOUNCE_MS = 500;
@ -54,6 +66,7 @@ interface Props {
alertOnNoData?: boolean;
};
alertsContext: AlertsContextValue<AlertContextMeta>;
alertInterval: string;
setAlertParams(key: string, value: any): void;
setAlertProperty(key: string, value: any): void;
}
@ -66,8 +79,24 @@ const defaultExpression = {
timeUnit: 'm',
} as MetricExpression;
async function getAlertPreview({
fetch,
params,
}: {
fetch: HttpSetup['fetch'];
params: rt.TypeOf<typeof alertPreviewRequestParamsRT>;
}): Promise<rt.TypeOf<typeof alertPreviewSuccessResponsePayloadRT>> {
return await fetch(`${INFRA_ALERT_PREVIEW_PATH}`, {
method: 'POST',
body: JSON.stringify({
...params,
alertType: METRIC_THRESHOLD_ALERT_TYPE_ID,
}),
});
}
export const Expressions: React.FC<Props> = (props) => {
const { setAlertParams, alertParams, errors, alertsContext } = props;
const { setAlertParams, alertParams, errors, alertsContext, alertInterval } = props;
const { source, createDerivedIndexPattern } = useSourceViaHttp({
sourceId: 'default',
type: 'metrics',
@ -75,6 +104,13 @@ export const Expressions: React.FC<Props> = (props) => {
toastWarning: alertsContext.toastNotifications.addWarning,
});
const [previewLookbackInterval, setPreviewLookbackInterval] = useState<string>('h');
const [isPreviewLoading, setIsPreviewLoading] = useState<boolean>(false);
const [previewError, setPreviewError] = useState<boolean>(false);
const [previewResult, setPreviewResult] = useState<rt.TypeOf<
typeof alertPreviewSuccessResponsePayloadRT
> | null>(null);
const [timeSize, setTimeSize] = useState<number | undefined>(1);
const [timeUnit, setTimeUnit] = useState<TimeUnit>('m');
const derivedIndexPattern = useMemo(() => createDerivedIndexPattern('metrics'), [
@ -143,7 +179,7 @@ export const Expressions: React.FC<Props> = (props) => {
const onGroupByChange = useCallback(
(group: string | null | string[]) => {
setAlertParams('groupBy', group || '');
setAlertParams('groupBy', group && group.length ? group : '');
},
[setAlertParams]
);
@ -224,6 +260,33 @@ export const Expressions: React.FC<Props> = (props) => {
}
}, [alertsContext.metadata, derivedIndexPattern, setAlertParams]);
const onSelectPreviewLookbackInterval = useCallback((e) => {
setPreviewLookbackInterval(e.target.value);
setPreviewResult(null);
}, []);
const onClickPreview = useCallback(async () => {
setIsPreviewLoading(true);
setPreviewResult(null);
setPreviewError(false);
try {
const result = await getAlertPreview({
fetch: alertsContext.http.fetch,
params: {
...pick(alertParams, 'criteria', 'groupBy', 'filterQuery'),
sourceId: alertParams.sourceId,
lookback: previewLookbackInterval as 'h' | 'd' | 'w' | 'M',
alertInterval,
},
});
setPreviewResult(result);
} catch (e) {
setPreviewError(true);
} finally {
setIsPreviewLoading(false);
}
}, [alertParams, alertInterval, alertsContext, previewLookbackInterval]);
useEffect(() => {
if (alertParams.criteria && alertParams.criteria.length) {
setTimeSize(alertParams.criteria[0].timeSize);
@ -246,6 +309,23 @@ export const Expressions: React.FC<Props> = (props) => {
[onFilterChange]
);
const previewIntervalError = useMemo(() => {
const intervalInSeconds = getIntervalInSeconds(alertInterval);
const lookbackInSeconds = getIntervalInSeconds(`1${previewLookbackInterval}`);
if (intervalInSeconds >= lookbackInSeconds) {
return true;
}
return false;
}, [previewLookbackInterval, alertInterval]);
const isPreviewDisabled = useMemo(() => {
const validationResult = validateMetricThreshold({ criteria: alertParams.criteria } as any);
const hasValidationErrors = Object.values(validationResult.errors).some((result) =>
Object.values(result).some((arr) => Array.isArray(arr) && arr.length)
);
return hasValidationErrors || previewIntervalError;
}, [alertParams.criteria, previewIntervalError]);
return (
<>
<EuiSpacer size={'m'} />
@ -381,10 +461,191 @@ export const Expressions: React.FC<Props> = (props) => {
}}
/>
</EuiFormRow>
<EuiSpacer size={'m'} />
<EuiFormRow
label={i18n.translate('xpack.infra.metrics.alertFlyout.previewLabel', {
defaultMessage: 'Preview',
})}
fullWidth
compressed
>
<>
<EuiFlexGroup>
<EuiFlexItem>
<EuiSelect
id="selectPreviewLookbackInterval"
value={previewLookbackInterval}
onChange={onSelectPreviewLookbackInterval}
options={previewOptions}
/>
</EuiFlexItem>
<EuiFlexItem grow={false}>
<EuiButton
isLoading={isPreviewLoading}
isDisabled={isPreviewDisabled}
onClick={onClickPreview}
>
{i18n.translate('xpack.infra.metrics.alertFlyout.testAlertTrigger', {
defaultMessage: 'Test alert trigger',
})}
</EuiButton>
</EuiFlexItem>
<EuiSpacer size={'s'} />
</EuiFlexGroup>
{previewResult && !previewIntervalError && !previewResult.resultTotals.tooManyBuckets && (
<>
<EuiSpacer size={'s'} />
<EuiText>
<FormattedMessage
id="xpack.infra.metrics.alertFlyout.alertPreviewResult"
defaultMessage="This alert would have fired {fired} {timeOrTimes} in the past {lookback}"
values={{
timeOrTimes:
previewResult.resultTotals.fired === 1 ? firedTimeLabel : firedTimesLabel,
fired: <strong>{previewResult.resultTotals.fired}</strong>,
lookback: previewOptions.find((e) => e.value === previewLookbackInterval)
?.shortText,
}}
/>{' '}
{alertParams.groupBy ? (
<FormattedMessage
id="xpack.infra.metrics.alertFlyout.alertPreviewGroups"
defaultMessage="across {numberOfGroups} {groupName}{plural}."
values={{
numberOfGroups: <strong>{previewResult.numberOfGroups}</strong>,
groupName: alertParams.groupBy,
plural: previewResult.numberOfGroups !== 1 ? 's' : '',
}}
/>
) : (
<FormattedMessage
id="xpack.infra.metrics.alertFlyout.alertPreviewAllData"
defaultMessage="across the entire infrastructure."
/>
)}
</EuiText>
{alertParams.alertOnNoData && previewResult.resultTotals.noData ? (
<>
<EuiSpacer size={'s'} />
<EuiText>
<FormattedMessage
id="xpack.infra.metrics.alertFlyout.alertPreviewNoDataResult"
defaultMessage="There {were} {noData} result{plural} of no data."
values={{
were: previewResult.resultTotals.noData !== 1 ? 'were' : 'was',
noData: <strong>{previewResult.resultTotals.noData}</strong>,
plural: previewResult.resultTotals.noData !== 1 ? 's' : '',
}}
/>
</EuiText>
</>
) : null}
{previewResult.resultTotals.error ? (
<>
<EuiSpacer size={'s'} />
<EuiText>
<FormattedMessage
id="xpack.infra.metrics.alertFlyout.alertPreviewErrorResult"
defaultMessage="An error occurred when trying to evaluate some of the data."
/>
</EuiText>
</>
) : null}
</>
)}
{previewResult && previewResult.resultTotals.tooManyBuckets ? (
<>
<EuiSpacer size={'s'} />
<EuiText>
<FormattedMessage
id="xpack.infra.metrics.alertFlyout.tooManyBucketsError"
defaultMessage="Too much data to preview. Please select a shorter preview length, or increase the amount of time in the {forTheLast} field."
values={{
forTheLast: <strong>FOR THE LAST</strong>,
}}
/>
</EuiText>
</>
) : null}
{previewIntervalError && (
<>
<EuiSpacer size={'s'} />
<EuiText>
<FormattedMessage
id="xpack.infra.metrics.alertFlyout.previewIntervalTooShort"
defaultMessage="Not enough data to preview. Please select a longer preview length, or increase the amount of time in the {checkEvery} field."
values={{
checkEvery: <strong>check every</strong>,
}}
/>
</EuiText>
</>
)}
{previewError && (
<>
<EuiSpacer size={'s'} />
<EuiText>
<FormattedMessage
id="xpack.infra.metrics.alertFlyout.alertPreviewError"
defaultMessage="An error occurred when trying to preview this alert trigger."
/>
</EuiText>
</>
)}
</>
</EuiFormRow>
<EuiSpacer size={'m'} />
</>
);
};
const previewOptions = [
{
value: 'h',
text: i18n.translate('xpack.infra.metrics.alertFlyout.lastHourLabel', {
defaultMessage: 'Last hour',
}),
shortText: i18n.translate('xpack.infra.metrics.alertFlyout.hourLabel', {
defaultMessage: 'hour',
}),
},
{
value: 'd',
text: i18n.translate('xpack.infra.metrics.alertFlyout.lastDayLabel', {
defaultMessage: 'Last day',
}),
shortText: i18n.translate('xpack.infra.metrics.alertFlyout.dayLabel', {
defaultMessage: 'day',
}),
},
{
value: 'w',
text: i18n.translate('xpack.infra.metrics.alertFlyout.lastWeekLabel', {
defaultMessage: 'Last week',
}),
shortText: i18n.translate('xpack.infra.metrics.alertFlyout.weekLabel', {
defaultMessage: 'week',
}),
},
{
value: 'M',
text: i18n.translate('xpack.infra.metrics.alertFlyout.lastMonthLabel', {
defaultMessage: 'Last month',
}),
shortText: i18n.translate('xpack.infra.metrics.alertFlyout.monthLabel', {
defaultMessage: 'month',
}),
},
];
const firedTimeLabel = i18n.translate('xpack.infra.metrics.alertFlyout.firedTime', {
defaultMessage: 'time',
});
const firedTimesLabel = i18n.translate('xpack.infra.metrics.alertFlyout.firedTimes', {
defaultMessage: 'times',
});
// required for dynamic import
// eslint-disable-next-line import/no-default-export
export default Expressions;

View file

@ -32,6 +32,7 @@ import {
import { initInventoryMetaRoute } from './routes/inventory_metadata';
import { initLogSourceConfigurationRoutes, initLogSourceStatusRoutes } from './routes/log_sources';
import { initSourceRoute } from './routes/source';
import { initAlertPreviewRoute } from './routes/alerting';
export const initInfraServer = (libs: InfraBackendLibs) => {
const schema = makeExecutableSchema({
@ -64,4 +65,5 @@ export const initInfraServer = (libs: InfraBackendLibs) => {
initInventoryMetaRoute(libs);
initLogSourceConfigurationRoutes(libs);
initLogSourceStatusRoutes(libs);
initAlertPreviewRoute(libs);
};

View file

@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { Aggregators } from './types';
import { Aggregators } from '../types';
export const createPercentileAggregation = (
type: Aggregators.P95 | Aggregators.P99,
field: string

View file

@ -0,0 +1,190 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { mapValues, first, last, isNaN } from 'lodash';
import {
TooManyBucketsPreviewExceptionMetadata,
isTooManyBucketsPreviewException,
TOO_MANY_BUCKETS_PREVIEW_EXCEPTION,
} from '../../../../../common/alerting/metrics';
import { InfraSource } from '../../../../../common/http_api/source_api';
import { InfraDatabaseSearchResponse } from '../../../adapters/framework/adapter_types';
import { createAfterKeyHandler } from '../../../../utils/create_afterkey_handler';
import { AlertServices, AlertExecutorOptions } from '../../../../../../alerts/server';
import { getAllCompositeData } from '../../../../utils/get_all_composite_data';
import { MetricExpressionParams, Comparator, Aggregators } from '../types';
import { DOCUMENT_COUNT_I18N } from '../messages';
import { getElasticsearchMetricQuery } from './metric_query';
interface Aggregation {
aggregatedIntervals: {
buckets: Array<{
aggregatedValue: { value: number; values?: Array<{ key: number; value: number }> };
doc_count: number;
}>;
};
}
interface CompositeAggregationsResponse {
groupings: {
buckets: Aggregation[];
};
}
export const evaluateAlert = (
callCluster: AlertServices['callCluster'],
params: AlertExecutorOptions['params'],
config: InfraSource['configuration'],
timeframe?: { start: number; end: number }
) => {
const { criteria, groupBy, filterQuery } = params as {
criteria: MetricExpressionParams[];
groupBy: string | undefined | string[];
filterQuery: string | undefined;
};
return Promise.all(
criteria.map(async (criterion) => {
const currentValues = await getMetric(
callCluster,
criterion,
config.metricAlias,
config.fields.timestamp,
groupBy,
filterQuery,
timeframe
);
const { threshold, comparator } = criterion;
const comparisonFunction = comparatorMap[comparator];
return mapValues(
currentValues,
(values: number | number[] | null | TooManyBucketsPreviewExceptionMetadata) => {
if (isTooManyBucketsPreviewException(values)) throw values;
return {
...criterion,
metric: criterion.metric ?? DOCUMENT_COUNT_I18N,
currentValue: Array.isArray(values) ? last(values) : NaN,
shouldFire: Array.isArray(values)
? values.map((value) => comparisonFunction(value, threshold))
: [false],
isNoData: values === null,
isError: isNaN(values),
};
}
);
})
);
};
const getMetric: (
callCluster: AlertServices['callCluster'],
params: MetricExpressionParams,
index: string,
timefield: string,
groupBy: string | undefined | string[],
filterQuery: string | undefined,
timeframe?: { start: number; end: number }
) => Promise<Record<string, number[]>> = async function (
callCluster,
params,
index,
timefield,
groupBy,
filterQuery,
timeframe
) {
const { aggType } = params;
const hasGroupBy = groupBy && groupBy.length;
const searchBody = getElasticsearchMetricQuery(
params,
timefield,
hasGroupBy ? groupBy : undefined,
filterQuery,
timeframe
);
try {
if (hasGroupBy) {
const bucketSelector = (
response: InfraDatabaseSearchResponse<{}, CompositeAggregationsResponse>
) => response.aggregations?.groupings?.buckets || [];
const afterKeyHandler = createAfterKeyHandler(
'aggs.groupings.composite.after',
(response) => response.aggregations?.groupings?.after_key
);
const compositeBuckets = (await getAllCompositeData(
(body) => callCluster('search', { body, index }),
searchBody,
bucketSelector,
afterKeyHandler
)) as Array<Aggregation & { key: Record<string, string> }>;
return compositeBuckets.reduce(
(result, bucket) => ({
...result,
[Object.values(bucket.key)
.map((value) => value)
.join(', ')]: getValuesFromAggregations(bucket, aggType),
}),
{}
);
}
const result = await callCluster('search', {
body: searchBody,
index,
});
return { '*': getValuesFromAggregations(result.aggregations, aggType) };
} catch (e) {
if (timeframe) {
// This code should only ever be reached when previewing the alert, not executing it
const causedByType = e.body?.error?.caused_by?.type;
if (causedByType === 'too_many_buckets_exception') {
return {
'*': {
[TOO_MANY_BUCKETS_PREVIEW_EXCEPTION]: true,
maxBuckets: e.body.error.caused_by.max_buckets,
},
};
}
}
return { '*': NaN }; // Trigger an Error state
}
};
const getValuesFromAggregations = (
aggregations: Aggregation,
aggType: MetricExpressionParams['aggType']
) => {
try {
const { buckets } = aggregations.aggregatedIntervals;
if (!buckets.length) return null; // No Data state
if (aggType === Aggregators.COUNT) {
return buckets.map((bucket) => bucket.doc_count);
}
if (aggType === Aggregators.P95 || aggType === Aggregators.P99) {
return buckets.map((bucket) => {
const values = bucket.aggregatedValue?.values || [];
const firstValue = first(values);
if (!firstValue) return null;
return firstValue.value;
});
}
return buckets.map((bucket) => bucket.aggregatedValue.value);
} catch (e) {
return NaN; // Error state
}
};
const comparatorMap = {
[Comparator.BETWEEN]: (value: number, [a, b]: number[]) =>
value >= Math.min(a, b) && value <= Math.max(a, b),
[Comparator.OUTSIDE_RANGE]: (value: number, [a, b]: number[]) => value < a || value > b,
// `threshold` is always an array of numbers in case the BETWEEN/OUTSIDE_RANGE comparator is
// used; all other compartors will just destructure the first value in the array
[Comparator.GT]: (a: number, [b]: number[]) => a > b,
[Comparator.LT]: (a: number, [b]: number[]) => a < b,
[Comparator.GT_OR_EQ]: (a: number, [b]: number[]) => a >= b,
[Comparator.LT_OR_EQ]: (a: number, [b]: number[]) => a <= b,
};

View file

@ -0,0 +1,140 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { networkTraffic } from '../../../../../common/inventory_models/shared/metrics/snapshot/network_traffic';
import { MetricExpressionParams, Aggregators } from '../types';
import { getIntervalInSeconds } from '../../../../utils/get_interval_in_seconds';
import { getDateHistogramOffset } from '../../../snapshot/query_helpers';
import { createPercentileAggregation } from './create_percentile_aggregation';
const MINIMUM_BUCKETS = 5;
const getParsedFilterQuery: (
filterQuery: string | undefined
) => Record<string, any> | Array<Record<string, any>> = (filterQuery) => {
if (!filterQuery) return {};
return JSON.parse(filterQuery).bool;
};
export const getElasticsearchMetricQuery = (
{ metric, aggType, timeUnit, timeSize }: MetricExpressionParams,
timefield: string,
groupBy?: string | string[],
filterQuery?: string,
timeframe?: { start: number; end: number }
) => {
if (aggType === Aggregators.COUNT && metric) {
throw new Error('Cannot aggregate document count with a metric');
}
if (aggType !== Aggregators.COUNT && !metric) {
throw new Error('Can only aggregate without a metric if using the document count aggregator');
}
const interval = `${timeSize}${timeUnit}`;
const intervalAsSeconds = getIntervalInSeconds(interval);
const to = timeframe ? timeframe.end : Date.now();
// We need enough data for 5 buckets worth of data. We also need
// to convert the intervalAsSeconds to milliseconds.
const minimumFrom = to - intervalAsSeconds * 1000 * MINIMUM_BUCKETS;
const from = timeframe && timeframe.start <= minimumFrom ? timeframe.start : minimumFrom;
const offset = getDateHistogramOffset(from, interval);
const aggregations =
aggType === Aggregators.COUNT
? {}
: aggType === Aggregators.RATE
? networkTraffic('aggregatedValue', metric)
: aggType === Aggregators.P95 || aggType === Aggregators.P99
? createPercentileAggregation(aggType, metric)
: {
aggregatedValue: {
[aggType]: {
field: metric,
},
},
};
const baseAggs = {
aggregatedIntervals: {
date_histogram: {
field: timefield,
fixed_interval: interval,
offset,
extended_bounds: {
min: from,
max: to,
},
},
aggregations,
},
};
const aggs = groupBy
? {
groupings: {
composite: {
size: 10,
sources: Array.isArray(groupBy)
? groupBy.map((field, index) => ({
[`groupBy${index}`]: {
terms: { field },
},
}))
: [
{
groupBy0: {
terms: {
field: groupBy,
},
},
},
],
},
aggs: baseAggs,
},
}
: baseAggs;
const rangeFilters = [
{
range: {
'@timestamp': {
gte: from,
lte: to,
format: 'epoch_millis',
},
},
},
];
const metricFieldFilters = metric
? [
{
exists: {
field: metric,
},
},
]
: [];
const parsedFilterQuery = getParsedFilterQuery(filterQuery);
return {
query: {
bool: {
filter: [
...rangeFilters,
...metricFieldFilters,
...(Array.isArray(parsedFilterQuery) ? parsedFilterQuery : []),
],
...(!Array.isArray(parsedFilterQuery) ? parsedFilterQuery : {}),
},
},
size: 0,
aggs,
};
};

View file

@ -383,34 +383,6 @@ const executor = createMetricThresholdExecutor(mockLibs, 'test') as (opts: {
}) => Promise<void>;
const services: AlertServicesMock = alertsMock.createAlertServices();
services.callCluster.mockImplementation(async (_: string, { body, index }: any) => {
if (index === 'alternatebeat-*') return mocks.changedSourceIdResponse;
const metric = body.query.bool.filter[1]?.exists.field;
if (body.aggs.groupings) {
if (body.aggs.groupings.composite.after) {
return mocks.compositeEndResponse;
}
if (metric === 'test.metric.2') {
return mocks.alternateCompositeResponse;
}
return mocks.basicCompositeResponse;
}
if (metric === 'test.metric.2') {
return mocks.alternateMetricResponse;
}
return mocks.basicMetricResponse;
});
services.savedObjectsClient.get.mockImplementation(async (type: string, sourceId: string) => {
if (sourceId === 'alternate')
return {
id: 'alternate',
attributes: { metricAlias: 'alternatebeat-*' },
type,
references: [],
};
return { id: 'default', attributes: { metricAlias: 'metricbeat-*' }, type, references: [] };
});
services.callCluster.mockImplementation(async (_: string, { body, index }: any) => {
if (index === 'alternatebeat-*') return mocks.changedSourceIdResponse;
const metric = body.query.bool.filter[1]?.exists.field;

View file

@ -3,263 +3,25 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { mapValues, first } from 'lodash';
import { first, last } from 'lodash';
import { i18n } from '@kbn/i18n';
import moment from 'moment';
import { InfraDatabaseSearchResponse } from '../../adapters/framework/adapter_types';
import { createAfterKeyHandler } from '../../../utils/create_afterkey_handler';
import { getAllCompositeData } from '../../../utils/get_all_composite_data';
import { networkTraffic } from '../../../../common/inventory_models/shared/metrics/snapshot/network_traffic';
import { MetricExpressionParams, Comparator, Aggregators, AlertStates } from './types';
import { AlertExecutorOptions } from '../../../../../alerts/server';
import { InfraBackendLibs } from '../../infra_types';
import { AlertStates } from './types';
import { evaluateAlert } from './lib/evaluate_alert';
import {
buildErrorAlertReason,
buildFiredAlertReason,
buildNoDataAlertReason,
DOCUMENT_COUNT_I18N,
stateToAlertMessage,
} from './messages';
import { AlertServices, AlertExecutorOptions } from '../../../../../alerts/server';
import { getIntervalInSeconds } from '../../../utils/get_interval_in_seconds';
import { getDateHistogramOffset } from '../../snapshot/query_helpers';
import { InfraBackendLibs } from '../../infra_types';
import { createPercentileAggregation } from './create_percentile_aggregation';
const TOTAL_BUCKETS = 5;
interface Aggregation {
aggregatedIntervals: {
buckets: Array<{
aggregatedValue: { value: number; values?: Array<{ key: number; value: number }> };
doc_count: number;
}>;
};
}
interface CompositeAggregationsResponse {
groupings: {
buckets: Aggregation[];
};
}
const getCurrentValueFromAggregations = (
aggregations: Aggregation,
aggType: MetricExpressionParams['aggType']
) => {
try {
const { buckets } = aggregations.aggregatedIntervals;
if (!buckets.length) return null; // No Data state
const mostRecentBucket = buckets[buckets.length - 1];
if (aggType === Aggregators.COUNT) {
return mostRecentBucket.doc_count;
}
if (aggType === Aggregators.P95 || aggType === Aggregators.P99) {
const values = mostRecentBucket.aggregatedValue?.values || [];
const firstValue = first(values);
if (!firstValue) return null;
return firstValue.value;
}
const { value } = mostRecentBucket.aggregatedValue;
return value;
} catch (e) {
return undefined; // Error state
}
};
const getParsedFilterQuery: (
filterQuery: string | undefined
) => Record<string, any> | Array<Record<string, any>> = (filterQuery) => {
if (!filterQuery) return {};
return JSON.parse(filterQuery).bool;
};
export const getElasticsearchMetricQuery = (
{ metric, aggType, timeUnit, timeSize }: MetricExpressionParams,
timefield: string,
groupBy?: string | string[],
filterQuery?: string
) => {
if (aggType === Aggregators.COUNT && metric) {
throw new Error('Cannot aggregate document count with a metric');
}
if (aggType !== Aggregators.COUNT && !metric) {
throw new Error('Can only aggregate without a metric if using the document count aggregator');
}
const interval = `${timeSize}${timeUnit}`;
const to = Date.now();
const intervalAsSeconds = getIntervalInSeconds(interval);
// We need enough data for 5 buckets worth of data. We also need
// to convert the intervalAsSeconds to milliseconds.
const from = to - intervalAsSeconds * 1000 * TOTAL_BUCKETS;
const offset = getDateHistogramOffset(from, interval);
const aggregations =
aggType === Aggregators.COUNT
? {}
: aggType === Aggregators.RATE
? networkTraffic('aggregatedValue', metric)
: aggType === Aggregators.P95 || aggType === Aggregators.P99
? createPercentileAggregation(aggType, metric)
: {
aggregatedValue: {
[aggType]: {
field: metric,
},
},
};
const baseAggs = {
aggregatedIntervals: {
date_histogram: {
field: timefield,
fixed_interval: interval,
offset,
extended_bounds: {
min: from,
max: to,
},
},
aggregations,
},
};
const aggs = groupBy
? {
groupings: {
composite: {
size: 10,
sources: Array.isArray(groupBy)
? groupBy.map((field, index) => ({
[`groupBy${index}`]: {
terms: { field },
},
}))
: [
{
groupBy0: {
terms: {
field: groupBy,
},
},
},
],
},
aggs: baseAggs,
},
}
: baseAggs;
const rangeFilters = [
{
range: {
'@timestamp': {
gte: from,
lte: to,
format: 'epoch_millis',
},
},
},
];
const metricFieldFilters = metric
? [
{
exists: {
field: metric,
},
},
]
: [];
const parsedFilterQuery = getParsedFilterQuery(filterQuery);
return {
query: {
bool: {
filter: [
...rangeFilters,
...metricFieldFilters,
...(Array.isArray(parsedFilterQuery) ? parsedFilterQuery : []),
],
...(!Array.isArray(parsedFilterQuery) ? parsedFilterQuery : {}),
},
},
size: 0,
aggs,
};
};
const getMetric: (
services: AlertServices,
params: MetricExpressionParams,
index: string,
timefield: string,
groupBy: string | undefined | string[],
filterQuery: string | undefined
) => Promise<Record<string, number>> = async function (
{ callCluster },
params,
index,
timefield,
groupBy,
filterQuery
) {
const { aggType } = params;
const searchBody = getElasticsearchMetricQuery(params, timefield, groupBy, filterQuery);
try {
if (groupBy) {
const bucketSelector = (
response: InfraDatabaseSearchResponse<{}, CompositeAggregationsResponse>
) => response.aggregations?.groupings?.buckets || [];
const afterKeyHandler = createAfterKeyHandler(
'aggs.groupings.composite.after',
(response) => response.aggregations?.groupings?.after_key
);
const compositeBuckets = (await getAllCompositeData(
(body) => callCluster('search', { body, index }),
searchBody,
bucketSelector,
afterKeyHandler
)) as Array<Aggregation & { key: Record<string, string> }>;
return compositeBuckets.reduce(
(result, bucket) => ({
...result,
[Object.values(bucket.key)
.map((value) => value)
.join(', ')]: getCurrentValueFromAggregations(bucket, aggType),
}),
{}
);
}
const result = await callCluster('search', {
body: searchBody,
index,
});
return { '*': getCurrentValueFromAggregations(result.aggregations, aggType) };
} catch (e) {
return { '*': undefined }; // Trigger an Error state
}
};
const comparatorMap = {
[Comparator.BETWEEN]: (value: number, [a, b]: number[]) =>
value >= Math.min(a, b) && value <= Math.max(a, b),
[Comparator.OUTSIDE_RANGE]: (value: number, [a, b]: number[]) => value < a || value > b,
// `threshold` is always an array of numbers in case the BETWEEN/OUTSIDE_RANGE comparator is
// used; all other compartors will just destructure the first value in the array
[Comparator.GT]: (a: number, [b]: number[]) => a > b,
[Comparator.LT]: (a: number, [b]: number[]) => a < b,
[Comparator.GT_OR_EQ]: (a: number, [b]: number[]) => a >= b,
[Comparator.LT_OR_EQ]: (a: number, [b]: number[]) => a <= b,
};
export const createMetricThresholdExecutor = (libs: InfraBackendLibs, alertId: string) =>
async function ({ services, params }: AlertExecutorOptions) {
const { criteria, groupBy, filterQuery, sourceId, alertOnNoData } = params as {
criteria: MetricExpressionParams[];
groupBy: string | undefined | string[];
filterQuery: string | undefined;
async function (options: AlertExecutorOptions) {
const { services, params } = options;
const { criteria } = params;
const { sourceId, alertOnNoData } = params as {
sourceId?: string;
alertOnNoData: boolean;
};
@ -269,39 +31,18 @@ export const createMetricThresholdExecutor = (libs: InfraBackendLibs, alertId: s
sourceId || 'default'
);
const config = source.configuration;
const alertResults = await Promise.all(
criteria.map((criterion) => {
return (async () => {
const currentValues = await getMetric(
services,
criterion,
config.metricAlias,
config.fields.timestamp,
groupBy,
filterQuery
);
const { threshold, comparator } = criterion;
const comparisonFunction = comparatorMap[comparator];
return mapValues(currentValues, (value) => ({
...criterion,
metric: criterion.metric ?? DOCUMENT_COUNT_I18N,
currentValue: value,
shouldFire:
value !== undefined && value !== null && comparisonFunction(value, threshold),
isNoData: value === null,
isError: value === undefined,
}));
})();
})
);
const alertResults = await evaluateAlert(services.callCluster, params, config);
// Because each alert result has the same group definitions, just grap the groups from the first one.
// Because each alert result has the same group definitions, just grab the groups from the first one.
const groups = Object.keys(first(alertResults));
for (const group of groups) {
const alertInstance = services.alertInstanceFactory(`${alertId}-${group}`);
// AND logic; all criteria must be across the threshold
const shouldAlertFire = alertResults.every((result) => result[group].shouldFire);
const shouldAlertFire = alertResults.every((result) =>
// Grab the result of the most recent bucket
last(result[group].shouldFire)
);
// AND logic; because we need to evaluate all criteria, if one of them reports no data then the
// whole alert is in a No Data/Error state
const isNoData = alertResults.some((result) => result[group].isNoData);

View file

@ -0,0 +1,168 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { first, zip } from 'lodash';
import {
TOO_MANY_BUCKETS_PREVIEW_EXCEPTION,
isTooManyBucketsPreviewException,
} from '../../../../common/alerting/metrics';
import { IScopedClusterClient } from '../../../../../../../src/core/server';
import { InfraSource } from '../../../../common/http_api/source_api';
import { getIntervalInSeconds } from '../../../utils/get_interval_in_seconds';
import { MetricExpressionParams } from './types';
import { evaluateAlert } from './lib/evaluate_alert';
const MAX_ITERATIONS = 50;
interface PreviewMetricThresholdAlertParams {
callCluster: IScopedClusterClient['callAsCurrentUser'];
params: {
criteria: MetricExpressionParams[];
groupBy: string | undefined | string[];
filterQuery: string | undefined;
};
config: InfraSource['configuration'];
lookback: 'h' | 'd' | 'w' | 'M';
alertInterval: string;
end?: number;
overrideLookbackIntervalInSeconds?: number;
}
export const previewMetricThresholdAlert: (
params: PreviewMetricThresholdAlertParams,
iterations?: number,
precalculatedNumberOfGroups?: number
) => Promise<Array<number | null | typeof TOO_MANY_BUCKETS_PREVIEW_EXCEPTION>> = async (
{
callCluster,
params,
config,
lookback,
alertInterval,
end = Date.now(),
overrideLookbackIntervalInSeconds,
},
iterations = 0,
precalculatedNumberOfGroups
) => {
// There are three different "intervals" we're dealing with here, so to disambiguate:
// - The lookback interval, which is how long of a period of time we want to examine to count
// how many times the alert fired
// - The interval in the alert params, which we'll call the bucket interval; this is how large of
// a time bucket the alert uses to evaluate its result
// - The alert interval, which is how often the alert fires
const { timeSize, timeUnit } = params.criteria[0];
const bucketInterval = `${timeSize}${timeUnit}`;
const bucketIntervalInSeconds = getIntervalInSeconds(bucketInterval);
const lookbackInterval = `1${lookback}`;
const lookbackIntervalInSeconds =
overrideLookbackIntervalInSeconds ?? getIntervalInSeconds(lookbackInterval);
const start = end - lookbackIntervalInSeconds * 1000;
const timeframe = { start, end };
// Get a date histogram using the bucket interval and the lookback interval
try {
const alertResults = await evaluateAlert(callCluster, params, config, timeframe);
const groups = Object.keys(first(alertResults));
// Now determine how to interpolate this histogram based on the alert interval
const alertIntervalInSeconds = getIntervalInSeconds(alertInterval);
const alertResultsPerExecution = alertIntervalInSeconds / bucketIntervalInSeconds;
const previewResults = await Promise.all(
groups.map(async (group) => {
const tooManyBuckets = alertResults.some((alertResult) =>
isTooManyBucketsPreviewException(alertResult[group])
);
if (tooManyBuckets) {
return TOO_MANY_BUCKETS_PREVIEW_EXCEPTION;
}
const isNoData = alertResults.some((alertResult) => alertResult[group].isNoData);
if (isNoData) {
return null;
}
const isError = alertResults.some((alertResult) => alertResult[group].isError);
if (isError) {
return NaN;
}
// Interpolate the buckets returned by evaluateAlert and return a count of how many of these
// buckets would have fired the alert. If the alert interval and bucket interval are the same,
// this will be a 1:1 evaluation of the alert results. If these are different, the interpolation
// will skip some buckets or read some buckets more than once, depending on the differential
const numberOfResultBuckets = first(alertResults)[group].shouldFire.length;
const numberOfExecutionBuckets = Math.floor(
numberOfResultBuckets / alertResultsPerExecution
);
let numberOfTimesFired = 0;
for (let i = 0; i < numberOfExecutionBuckets; i++) {
const mappedBucketIndex = Math.floor(i * alertResultsPerExecution);
const allConditionsFiredInMappedBucket = alertResults.every(
(alertResult) => alertResult[group].shouldFire[mappedBucketIndex]
);
if (allConditionsFiredInMappedBucket) numberOfTimesFired++;
}
return numberOfTimesFired;
})
);
return previewResults;
} catch (e) {
if (isTooManyBucketsPreviewException(e)) {
// If there's too much data on the first request, recursively slice the lookback interval
// until all the data can be retrieved
const basePreviewParams = { callCluster, params, config, lookback, alertInterval };
const { maxBuckets } = e;
// If this is still the first iteration, try to get the number of groups in order to
// calculate max buckets. If this fails, just estimate based on 1 group
const currentAlertResults = !precalculatedNumberOfGroups
? await evaluateAlert(callCluster, params, config)
: [];
const numberOfGroups =
precalculatedNumberOfGroups ?? Math.max(Object.keys(first(currentAlertResults)).length, 1);
const estimatedTotalBuckets =
(lookbackIntervalInSeconds / bucketIntervalInSeconds) * numberOfGroups;
// The minimum number of slices is 2. In case we underestimate the total number of buckets
// in the first iteration, we can bisect the remaining buckets on further recursions to get
// all the data needed
const slices = Math.max(Math.ceil(estimatedTotalBuckets / maxBuckets), 2);
const slicedLookback = Math.floor(lookbackIntervalInSeconds / slices);
// Bail out if it looks like this is going to take too long
if (slicedLookback <= 0 || iterations > MAX_ITERATIONS || slices > MAX_ITERATIONS) {
return [TOO_MANY_BUCKETS_PREVIEW_EXCEPTION];
}
const slicedRequests = [...Array(slices)].map((_, i) => {
return previewMetricThresholdAlert(
{
...basePreviewParams,
end: Math.min(end, start + slicedLookback * (i + 1) * 1000),
overrideLookbackIntervalInSeconds: slicedLookback,
},
iterations + slices,
numberOfGroups
);
});
const results = await Promise.all(slicedRequests);
const zippedResult = zip(...results).map((result) =>
result
// `undefined` values occur if there is no data at all in a certain slice, and that slice
// returns an empty array. This is different from an error or no data state,
// so filter these results out entirely and only regard the resultA portion
.filter((value) => typeof value !== 'undefined')
.reduce((a, b) => {
if (typeof a !== 'number') return a;
if (typeof b !== 'number') return b;
return a + b;
})
);
return zippedResult;
} else throw e;
}
};

View file

@ -0,0 +1,7 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export * from './preview';

View file

@ -0,0 +1,95 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import {
METRIC_THRESHOLD_ALERT_TYPE_ID,
METRIC_INVENTORY_THRESHOLD_ALERT_TYPE_ID,
INFRA_ALERT_PREVIEW_PATH,
TOO_MANY_BUCKETS_PREVIEW_EXCEPTION,
alertPreviewRequestParamsRT,
alertPreviewSuccessResponsePayloadRT,
MetricThresholdAlertPreviewRequestParams,
} from '../../../common/alerting/metrics';
import { createValidationFunction } from '../../../common/runtime_types';
import { previewMetricThresholdAlert } from '../../lib/alerting/metric_threshold/preview_metric_threshold_alert';
import { InfraBackendLibs } from '../../lib/infra_types';
export const initAlertPreviewRoute = ({ framework, sources }: InfraBackendLibs) => {
const { callWithRequest } = framework;
framework.registerRoute(
{
method: 'post',
path: INFRA_ALERT_PREVIEW_PATH,
validate: {
body: createValidationFunction(alertPreviewRequestParamsRT),
},
},
framework.router.handleLegacyErrors(async (requestContext, request, response) => {
const { criteria, filterQuery, lookback, sourceId, alertType, alertInterval } = request.body;
const callCluster = (endpoint: string, opts: Record<string, any>) => {
return callWithRequest(requestContext, endpoint, opts);
};
const source = await sources.getSourceConfiguration(
requestContext.core.savedObjects.client,
sourceId || 'default'
);
try {
switch (alertType) {
case METRIC_THRESHOLD_ALERT_TYPE_ID: {
const { groupBy } = request.body as MetricThresholdAlertPreviewRequestParams;
const previewResult = await previewMetricThresholdAlert({
callCluster,
params: { criteria, filterQuery, groupBy },
lookback,
config: source.configuration,
alertInterval,
});
const numberOfGroups = previewResult.length;
const resultTotals = previewResult.reduce(
(totals, groupResult) => {
if (groupResult === TOO_MANY_BUCKETS_PREVIEW_EXCEPTION)
return { ...totals, tooManyBuckets: totals.tooManyBuckets + 1 };
if (groupResult === null) return { ...totals, noData: totals.noData + 1 };
if (isNaN(groupResult)) return { ...totals, error: totals.error + 1 };
return { ...totals, fired: totals.fired + groupResult };
},
{
fired: 0,
noData: 0,
error: 0,
tooManyBuckets: 0,
}
);
return response.ok({
body: alertPreviewSuccessResponsePayloadRT.encode({
numberOfGroups,
resultTotals,
}),
});
}
case METRIC_INVENTORY_THRESHOLD_ALERT_TYPE_ID: {
// TODO: Add inventory preview functionality
return response.ok({});
}
default:
throw new Error('Unknown alert type');
}
} catch (error) {
return response.customError({
statusCode: error.statusCode ?? 500,
body: {
message: error.message ?? 'An unexpected error occurred',
},
});
}
})
);
};

View file

@ -5,7 +5,7 @@
*/
import expect from '@kbn/expect';
import { getElasticsearchMetricQuery } from '../../../../plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor';
import { getElasticsearchMetricQuery } from '../../../../plugins/infra/server/lib/alerting/metric_threshold/lib/metric_query';
import { MetricExpressionParams } from '../../../../plugins/infra/server/lib/alerting/metric_threshold/types';
import { FtrProviderContext } from '../../ftr_provider_context';