mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
* Add tests for metric threshold alerts * Fix count aggregator * Remove redundant typedefs Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com> Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
parent
7280bae3e6
commit
e7a4d57f93
7 changed files with 712 additions and 236 deletions
|
@ -0,0 +1,244 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { createMetricThresholdExecutor, FIRED_ACTIONS } from './metric_threshold_executor';
|
||||
import { Comparator, AlertStates } from './types';
|
||||
import * as mocks from './test_mocks';
|
||||
import { AlertExecutorOptions } from '../../../../../alerting/server';
|
||||
|
||||
const executor = createMetricThresholdExecutor('test') as (opts: {
|
||||
params: AlertExecutorOptions['params'];
|
||||
services: { callCluster: AlertExecutorOptions['params']['callCluster'] };
|
||||
}) => Promise<void>;
|
||||
const alertInstances = new Map();
|
||||
|
||||
const services = {
|
||||
callCluster(_: string, { body }: any) {
|
||||
const metric = body.query.bool.filter[1].exists.field;
|
||||
if (body.aggs.groupings) {
|
||||
if (body.aggs.groupings.composite.after) {
|
||||
return mocks.compositeEndResponse;
|
||||
}
|
||||
if (metric === 'test.metric.2') {
|
||||
return mocks.alternateCompositeResponse;
|
||||
}
|
||||
return mocks.basicCompositeResponse;
|
||||
}
|
||||
if (metric === 'test.metric.2') {
|
||||
return mocks.alternateMetricResponse;
|
||||
}
|
||||
return mocks.basicMetricResponse;
|
||||
},
|
||||
alertInstanceFactory(instanceID: string) {
|
||||
let state: any;
|
||||
const actionQueue: any[] = [];
|
||||
const instance = {
|
||||
actionQueue: [],
|
||||
get state() {
|
||||
return state;
|
||||
},
|
||||
get mostRecentAction() {
|
||||
return actionQueue.pop();
|
||||
},
|
||||
};
|
||||
alertInstances.set(instanceID, instance);
|
||||
return {
|
||||
instanceID,
|
||||
scheduleActions(id: string, action: any) {
|
||||
actionQueue.push({ id, action });
|
||||
},
|
||||
replaceState(newState: any) {
|
||||
state = newState;
|
||||
},
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
const baseCriterion = {
|
||||
aggType: 'avg',
|
||||
metric: 'test.metric.1',
|
||||
timeSize: 1,
|
||||
timeUnit: 'm',
|
||||
indexPattern: 'metricbeat-*',
|
||||
};
|
||||
describe('The metric threshold alert type', () => {
|
||||
describe('querying the entire infrastructure', () => {
|
||||
const instanceID = 'test-*';
|
||||
const execute = (comparator: Comparator, threshold: number[]) =>
|
||||
executor({
|
||||
services,
|
||||
params: {
|
||||
criteria: [
|
||||
{
|
||||
...baseCriterion,
|
||||
comparator,
|
||||
threshold,
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
test('alerts as expected with the > comparator', async () => {
|
||||
await execute(Comparator.GT, [0.75]);
|
||||
expect(alertInstances.get(instanceID).mostRecentAction.id).toBe(FIRED_ACTIONS.id);
|
||||
expect(alertInstances.get(instanceID).state.alertState).toBe(AlertStates.ALERT);
|
||||
await execute(Comparator.GT, [1.5]);
|
||||
expect(alertInstances.get(instanceID).mostRecentAction).toBe(undefined);
|
||||
expect(alertInstances.get(instanceID).state.alertState).toBe(AlertStates.OK);
|
||||
});
|
||||
test('alerts as expected with the < comparator', async () => {
|
||||
await execute(Comparator.LT, [1.5]);
|
||||
expect(alertInstances.get(instanceID).mostRecentAction.id).toBe(FIRED_ACTIONS.id);
|
||||
expect(alertInstances.get(instanceID).state.alertState).toBe(AlertStates.ALERT);
|
||||
await execute(Comparator.LT, [0.75]);
|
||||
expect(alertInstances.get(instanceID).mostRecentAction).toBe(undefined);
|
||||
expect(alertInstances.get(instanceID).state.alertState).toBe(AlertStates.OK);
|
||||
});
|
||||
test('alerts as expected with the >= comparator', async () => {
|
||||
await execute(Comparator.GT_OR_EQ, [0.75]);
|
||||
expect(alertInstances.get(instanceID).mostRecentAction.id).toBe(FIRED_ACTIONS.id);
|
||||
expect(alertInstances.get(instanceID).state.alertState).toBe(AlertStates.ALERT);
|
||||
await execute(Comparator.GT_OR_EQ, [1.0]);
|
||||
expect(alertInstances.get(instanceID).mostRecentAction.id).toBe(FIRED_ACTIONS.id);
|
||||
expect(alertInstances.get(instanceID).state.alertState).toBe(AlertStates.ALERT);
|
||||
await execute(Comparator.GT_OR_EQ, [1.5]);
|
||||
expect(alertInstances.get(instanceID).mostRecentAction).toBe(undefined);
|
||||
expect(alertInstances.get(instanceID).state.alertState).toBe(AlertStates.OK);
|
||||
});
|
||||
test('alerts as expected with the <= comparator', async () => {
|
||||
await execute(Comparator.LT_OR_EQ, [1.5]);
|
||||
expect(alertInstances.get(instanceID).mostRecentAction.id).toBe(FIRED_ACTIONS.id);
|
||||
expect(alertInstances.get(instanceID).state.alertState).toBe(AlertStates.ALERT);
|
||||
await execute(Comparator.LT_OR_EQ, [1.0]);
|
||||
expect(alertInstances.get(instanceID).mostRecentAction.id).toBe(FIRED_ACTIONS.id);
|
||||
expect(alertInstances.get(instanceID).state.alertState).toBe(AlertStates.ALERT);
|
||||
await execute(Comparator.LT_OR_EQ, [0.75]);
|
||||
expect(alertInstances.get(instanceID).mostRecentAction).toBe(undefined);
|
||||
expect(alertInstances.get(instanceID).state.alertState).toBe(AlertStates.OK);
|
||||
});
|
||||
test('alerts as expected with the between comparator', async () => {
|
||||
await execute(Comparator.BETWEEN, [0, 1.5]);
|
||||
expect(alertInstances.get(instanceID).mostRecentAction.id).toBe(FIRED_ACTIONS.id);
|
||||
expect(alertInstances.get(instanceID).state.alertState).toBe(AlertStates.ALERT);
|
||||
await execute(Comparator.BETWEEN, [0, 0.75]);
|
||||
expect(alertInstances.get(instanceID).mostRecentAction).toBe(undefined);
|
||||
expect(alertInstances.get(instanceID).state.alertState).toBe(AlertStates.OK);
|
||||
});
|
||||
});
|
||||
|
||||
describe('querying with a groupBy parameter', () => {
|
||||
const execute = (comparator: Comparator, threshold: number[]) =>
|
||||
executor({
|
||||
services,
|
||||
params: {
|
||||
groupBy: 'something',
|
||||
criteria: [
|
||||
{
|
||||
...baseCriterion,
|
||||
comparator,
|
||||
threshold,
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
const instanceIdA = 'test-a';
|
||||
const instanceIdB = 'test-b';
|
||||
test('sends an alert when all groups pass the threshold', async () => {
|
||||
await execute(Comparator.GT, [0.75]);
|
||||
expect(alertInstances.get(instanceIdA).mostRecentAction.id).toBe(FIRED_ACTIONS.id);
|
||||
expect(alertInstances.get(instanceIdA).state.alertState).toBe(AlertStates.ALERT);
|
||||
expect(alertInstances.get(instanceIdB).mostRecentAction.id).toBe(FIRED_ACTIONS.id);
|
||||
expect(alertInstances.get(instanceIdB).state.alertState).toBe(AlertStates.ALERT);
|
||||
});
|
||||
test('sends an alert when only some groups pass the threshold', async () => {
|
||||
await execute(Comparator.LT, [1.5]);
|
||||
expect(alertInstances.get(instanceIdA).mostRecentAction.id).toBe(FIRED_ACTIONS.id);
|
||||
expect(alertInstances.get(instanceIdA).state.alertState).toBe(AlertStates.ALERT);
|
||||
expect(alertInstances.get(instanceIdB).mostRecentAction).toBe(undefined);
|
||||
expect(alertInstances.get(instanceIdB).state.alertState).toBe(AlertStates.OK);
|
||||
});
|
||||
test('sends no alert when no groups pass the threshold', async () => {
|
||||
await execute(Comparator.GT, [5]);
|
||||
expect(alertInstances.get(instanceIdA).mostRecentAction).toBe(undefined);
|
||||
expect(alertInstances.get(instanceIdA).state.alertState).toBe(AlertStates.OK);
|
||||
expect(alertInstances.get(instanceIdB).mostRecentAction).toBe(undefined);
|
||||
expect(alertInstances.get(instanceIdB).state.alertState).toBe(AlertStates.OK);
|
||||
});
|
||||
});
|
||||
|
||||
describe('querying with multiple criteria', () => {
|
||||
const execute = (
|
||||
comparator: Comparator,
|
||||
thresholdA: number[],
|
||||
thresholdB: number[],
|
||||
groupBy: string = ''
|
||||
) =>
|
||||
executor({
|
||||
services,
|
||||
params: {
|
||||
groupBy,
|
||||
criteria: [
|
||||
{
|
||||
...baseCriterion,
|
||||
comparator,
|
||||
threshold: thresholdA,
|
||||
},
|
||||
{
|
||||
...baseCriterion,
|
||||
comparator,
|
||||
threshold: thresholdB,
|
||||
metric: 'test.metric.2',
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
test('sends an alert when all criteria cross the threshold', async () => {
|
||||
const instanceID = 'test-*';
|
||||
await execute(Comparator.GT_OR_EQ, [1.0], [3.0]);
|
||||
expect(alertInstances.get(instanceID).mostRecentAction.id).toBe(FIRED_ACTIONS.id);
|
||||
expect(alertInstances.get(instanceID).state.alertState).toBe(AlertStates.ALERT);
|
||||
});
|
||||
test('sends no alert when some, but not all, criteria cross the threshold', async () => {
|
||||
const instanceID = 'test-*';
|
||||
await execute(Comparator.LT_OR_EQ, [1.0], [3.0]);
|
||||
expect(alertInstances.get(instanceID).mostRecentAction).toBe(undefined);
|
||||
expect(alertInstances.get(instanceID).state.alertState).toBe(AlertStates.OK);
|
||||
});
|
||||
test('alerts only on groups that meet all criteria when querying with a groupBy parameter', async () => {
|
||||
const instanceIdA = 'test-a';
|
||||
const instanceIdB = 'test-b';
|
||||
await execute(Comparator.GT_OR_EQ, [1.0], [3.0], 'something');
|
||||
expect(alertInstances.get(instanceIdA).mostRecentAction.id).toBe(FIRED_ACTIONS.id);
|
||||
expect(alertInstances.get(instanceIdA).state.alertState).toBe(AlertStates.ALERT);
|
||||
expect(alertInstances.get(instanceIdB).mostRecentAction).toBe(undefined);
|
||||
expect(alertInstances.get(instanceIdB).state.alertState).toBe(AlertStates.OK);
|
||||
});
|
||||
});
|
||||
describe('querying with the count aggregator', () => {
|
||||
const instanceID = 'test-*';
|
||||
const execute = (comparator: Comparator, threshold: number[]) =>
|
||||
executor({
|
||||
services,
|
||||
params: {
|
||||
criteria: [
|
||||
{
|
||||
...baseCriterion,
|
||||
comparator,
|
||||
threshold,
|
||||
aggType: 'count',
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
test('alerts based on the doc_count value instead of the aggregatedValue', async () => {
|
||||
await execute(Comparator.GT, [2]);
|
||||
expect(alertInstances.get(instanceID).mostRecentAction.id).toBe(FIRED_ACTIONS.id);
|
||||
expect(alertInstances.get(instanceID).state.alertState).toBe(AlertStates.ALERT);
|
||||
await execute(Comparator.LT, [1.5]);
|
||||
expect(alertInstances.get(instanceID).mostRecentAction).toBe(undefined);
|
||||
expect(alertInstances.get(instanceID).state.alertState).toBe(AlertStates.OK);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,255 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
import { mapValues } from 'lodash';
|
||||
import { i18n } from '@kbn/i18n';
|
||||
import { InfraDatabaseSearchResponse } from '../../adapters/framework/adapter_types';
|
||||
import { createAfterKeyHandler } from '../../../utils/create_afterkey_handler';
|
||||
import { getAllCompositeData } from '../../../utils/get_all_composite_data';
|
||||
import { networkTraffic } from '../../../../common/inventory_models/shared/metrics/snapshot/network_traffic';
|
||||
import { MetricExpressionParams, Comparator, AlertStates } from './types';
|
||||
import { AlertServices, AlertExecutorOptions } from '../../../../../alerting/server';
|
||||
|
||||
interface Aggregation {
|
||||
aggregatedIntervals: {
|
||||
buckets: Array<{ aggregatedValue: { value: number }; doc_count: number }>;
|
||||
};
|
||||
}
|
||||
|
||||
interface CompositeAggregationsResponse {
|
||||
groupings: {
|
||||
buckets: Aggregation[];
|
||||
};
|
||||
}
|
||||
|
||||
const getCurrentValueFromAggregations = (
|
||||
aggregations: Aggregation,
|
||||
aggType: MetricExpressionParams['aggType']
|
||||
) => {
|
||||
try {
|
||||
const { buckets } = aggregations.aggregatedIntervals;
|
||||
if (!buckets.length) return null; // No Data state
|
||||
const mostRecentBucket = buckets[buckets.length - 1];
|
||||
if (aggType === 'count') {
|
||||
return mostRecentBucket.doc_count;
|
||||
}
|
||||
const { value } = mostRecentBucket.aggregatedValue;
|
||||
return value;
|
||||
} catch (e) {
|
||||
return undefined; // Error state
|
||||
}
|
||||
};
|
||||
|
||||
const getParsedFilterQuery: (
|
||||
filterQuery: string | undefined
|
||||
) => Record<string, any> = filterQuery => {
|
||||
if (!filterQuery) return {};
|
||||
try {
|
||||
return JSON.parse(filterQuery).bool;
|
||||
} catch (e) {
|
||||
return {
|
||||
query_string: {
|
||||
query: filterQuery,
|
||||
analyze_wildcard: true,
|
||||
},
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
export const getElasticsearchMetricQuery = (
|
||||
{ metric, aggType, timeUnit, timeSize }: MetricExpressionParams,
|
||||
groupBy?: string,
|
||||
filterQuery?: string
|
||||
) => {
|
||||
const interval = `${timeSize}${timeUnit}`;
|
||||
|
||||
const aggregations =
|
||||
aggType === 'count'
|
||||
? {}
|
||||
: aggType === 'rate'
|
||||
? networkTraffic('aggregatedValue', metric)
|
||||
: {
|
||||
aggregatedValue: {
|
||||
[aggType]: {
|
||||
field: metric,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const baseAggs = {
|
||||
aggregatedIntervals: {
|
||||
date_histogram: {
|
||||
field: '@timestamp',
|
||||
fixed_interval: interval,
|
||||
},
|
||||
aggregations,
|
||||
},
|
||||
};
|
||||
|
||||
const aggs = groupBy
|
||||
? {
|
||||
groupings: {
|
||||
composite: {
|
||||
size: 10,
|
||||
sources: [
|
||||
{
|
||||
groupBy: {
|
||||
terms: {
|
||||
field: groupBy,
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
aggs: baseAggs,
|
||||
},
|
||||
}
|
||||
: baseAggs;
|
||||
|
||||
const parsedFilterQuery = getParsedFilterQuery(filterQuery);
|
||||
|
||||
return {
|
||||
query: {
|
||||
bool: {
|
||||
filter: [
|
||||
{
|
||||
range: {
|
||||
'@timestamp': {
|
||||
gte: `now-${interval}`,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
exists: {
|
||||
field: metric,
|
||||
},
|
||||
},
|
||||
],
|
||||
...parsedFilterQuery,
|
||||
},
|
||||
},
|
||||
size: 0,
|
||||
aggs,
|
||||
};
|
||||
};
|
||||
|
||||
const getMetric: (
|
||||
services: AlertServices,
|
||||
params: MetricExpressionParams,
|
||||
groupBy: string | undefined,
|
||||
filterQuery: string | undefined
|
||||
) => Promise<Record<string, number>> = async function(
|
||||
{ callCluster },
|
||||
params,
|
||||
groupBy,
|
||||
filterQuery
|
||||
) {
|
||||
const { indexPattern, aggType } = params;
|
||||
const searchBody = getElasticsearchMetricQuery(params, groupBy, filterQuery);
|
||||
|
||||
try {
|
||||
if (groupBy) {
|
||||
const bucketSelector = (
|
||||
response: InfraDatabaseSearchResponse<{}, CompositeAggregationsResponse>
|
||||
) => response.aggregations?.groupings?.buckets || [];
|
||||
const afterKeyHandler = createAfterKeyHandler(
|
||||
'aggs.groupings.composite.after',
|
||||
response => response.aggregations?.groupings?.after_key
|
||||
);
|
||||
const compositeBuckets = (await getAllCompositeData(
|
||||
body => callCluster('search', { body, index: indexPattern }),
|
||||
searchBody,
|
||||
bucketSelector,
|
||||
afterKeyHandler
|
||||
)) as Array<Aggregation & { key: { groupBy: string } }>;
|
||||
return compositeBuckets.reduce(
|
||||
(result, bucket) => ({
|
||||
...result,
|
||||
[bucket.key.groupBy]: getCurrentValueFromAggregations(bucket, aggType),
|
||||
}),
|
||||
{}
|
||||
);
|
||||
}
|
||||
const result = await callCluster('search', {
|
||||
body: searchBody,
|
||||
index: indexPattern,
|
||||
});
|
||||
return { '*': getCurrentValueFromAggregations(result.aggregations, aggType) };
|
||||
} catch (e) {
|
||||
return { '*': undefined }; // Trigger an Error state
|
||||
}
|
||||
};
|
||||
|
||||
const comparatorMap = {
|
||||
[Comparator.BETWEEN]: (value: number, [a, b]: number[]) =>
|
||||
value >= Math.min(a, b) && value <= Math.max(a, b),
|
||||
// `threshold` is always an array of numbers in case the BETWEEN comparator is
|
||||
// used; all other compartors will just destructure the first value in the array
|
||||
[Comparator.GT]: (a: number, [b]: number[]) => a > b,
|
||||
[Comparator.LT]: (a: number, [b]: number[]) => a < b,
|
||||
[Comparator.GT_OR_EQ]: (a: number, [b]: number[]) => a >= b,
|
||||
[Comparator.LT_OR_EQ]: (a: number, [b]: number[]) => a <= b,
|
||||
};
|
||||
|
||||
export const createMetricThresholdExecutor = (alertUUID: string) =>
|
||||
async function({ services, params }: AlertExecutorOptions) {
|
||||
const { criteria, groupBy, filterQuery } = params as {
|
||||
criteria: MetricExpressionParams[];
|
||||
groupBy: string | undefined;
|
||||
filterQuery: string | undefined;
|
||||
};
|
||||
|
||||
const alertResults = await Promise.all(
|
||||
criteria.map(criterion =>
|
||||
(async () => {
|
||||
const currentValues = await getMetric(services, criterion, groupBy, filterQuery);
|
||||
const { threshold, comparator } = criterion;
|
||||
const comparisonFunction = comparatorMap[comparator];
|
||||
return mapValues(currentValues, value => ({
|
||||
shouldFire:
|
||||
value !== undefined && value !== null && comparisonFunction(value, threshold),
|
||||
currentValue: value,
|
||||
isNoData: value === null,
|
||||
isError: value === undefined,
|
||||
}));
|
||||
})()
|
||||
)
|
||||
);
|
||||
|
||||
const groups = Object.keys(alertResults[0]);
|
||||
for (const group of groups) {
|
||||
const alertInstance = services.alertInstanceFactory(`${alertUUID}-${group}`);
|
||||
|
||||
// AND logic; all criteria must be across the threshold
|
||||
const shouldAlertFire = alertResults.every(result => result[group].shouldFire);
|
||||
// AND logic; because we need to evaluate all criteria, if one of them reports no data then the
|
||||
// whole alert is in a No Data/Error state
|
||||
const isNoData = alertResults.some(result => result[group].isNoData);
|
||||
const isError = alertResults.some(result => result[group].isError);
|
||||
if (shouldAlertFire) {
|
||||
alertInstance.scheduleActions(FIRED_ACTIONS.id, {
|
||||
group,
|
||||
value: alertResults.map(result => result[group].currentValue),
|
||||
});
|
||||
}
|
||||
// Future use: ability to fetch display current alert state
|
||||
alertInstance.replaceState({
|
||||
alertState: isError
|
||||
? AlertStates.ERROR
|
||||
: isNoData
|
||||
? AlertStates.NO_DATA
|
||||
: shouldAlertFire
|
||||
? AlertStates.ALERT
|
||||
: AlertStates.OK,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
export const FIRED_ACTIONS = {
|
||||
id: 'metrics.threshold.fired',
|
||||
name: i18n.translate('xpack.infra.metrics.alerting.threshold.fired', {
|
||||
defaultMessage: 'Fired',
|
||||
}),
|
||||
};
|
|
@ -4,188 +4,10 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
import uuid from 'uuid';
|
||||
import { mapValues } from 'lodash';
|
||||
import { i18n } from '@kbn/i18n';
|
||||
import { schema } from '@kbn/config-schema';
|
||||
import { InfraDatabaseSearchResponse } from '../../adapters/framework/adapter_types';
|
||||
import { createAfterKeyHandler } from '../../../utils/create_afterkey_handler';
|
||||
import { getAllCompositeData } from '../../../utils/get_all_composite_data';
|
||||
import { networkTraffic } from '../../../../common/inventory_models/shared/metrics/snapshot/network_traffic';
|
||||
import {
|
||||
MetricExpressionParams,
|
||||
Comparator,
|
||||
AlertStates,
|
||||
METRIC_THRESHOLD_ALERT_TYPE_ID,
|
||||
} from './types';
|
||||
import { AlertServices, PluginSetupContract } from '../../../../../alerting/server';
|
||||
|
||||
interface Aggregation {
|
||||
aggregatedIntervals: { buckets: Array<{ aggregatedValue: { value: number } }> };
|
||||
}
|
||||
|
||||
interface CompositeAggregationsResponse {
|
||||
groupings: {
|
||||
buckets: Aggregation[];
|
||||
};
|
||||
}
|
||||
|
||||
const FIRED_ACTIONS = {
|
||||
id: 'metrics.threshold.fired',
|
||||
name: i18n.translate('xpack.infra.metrics.alerting.threshold.fired', {
|
||||
defaultMessage: 'Fired',
|
||||
}),
|
||||
};
|
||||
|
||||
const getCurrentValueFromAggregations = (aggregations: Aggregation) => {
|
||||
try {
|
||||
const { buckets } = aggregations.aggregatedIntervals;
|
||||
if (!buckets.length) return null; // No Data state
|
||||
const { value } = buckets[buckets.length - 1].aggregatedValue;
|
||||
return value;
|
||||
} catch (e) {
|
||||
return undefined; // Error state
|
||||
}
|
||||
};
|
||||
|
||||
const getParsedFilterQuery: (
|
||||
filterQuery: string | undefined
|
||||
) => Record<string, any> = filterQuery => {
|
||||
if (!filterQuery) return {};
|
||||
try {
|
||||
return JSON.parse(filterQuery).bool;
|
||||
} catch (e) {
|
||||
return {
|
||||
query_string: {
|
||||
query: filterQuery,
|
||||
analyze_wildcard: true,
|
||||
},
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
const getMetric: (
|
||||
services: AlertServices,
|
||||
params: MetricExpressionParams,
|
||||
groupBy: string | undefined,
|
||||
filterQuery: string | undefined
|
||||
) => Promise<Record<string, number>> = async function(
|
||||
{ callCluster },
|
||||
{ metric, aggType, timeUnit, timeSize, indexPattern },
|
||||
groupBy,
|
||||
filterQuery
|
||||
) {
|
||||
const interval = `${timeSize}${timeUnit}`;
|
||||
|
||||
const aggregations =
|
||||
aggType === 'rate'
|
||||
? networkTraffic('aggregatedValue', metric)
|
||||
: {
|
||||
aggregatedValue: {
|
||||
[aggType]: {
|
||||
field: metric,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const baseAggs = {
|
||||
aggregatedIntervals: {
|
||||
date_histogram: {
|
||||
field: '@timestamp',
|
||||
fixed_interval: interval,
|
||||
},
|
||||
aggregations,
|
||||
},
|
||||
};
|
||||
|
||||
const aggs = groupBy
|
||||
? {
|
||||
groupings: {
|
||||
composite: {
|
||||
size: 10,
|
||||
sources: [
|
||||
{
|
||||
groupBy: {
|
||||
terms: {
|
||||
field: groupBy,
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
aggs: baseAggs,
|
||||
},
|
||||
}
|
||||
: baseAggs;
|
||||
|
||||
const parsedFilterQuery = getParsedFilterQuery(filterQuery);
|
||||
|
||||
const searchBody = {
|
||||
query: {
|
||||
bool: {
|
||||
filter: [
|
||||
{
|
||||
range: {
|
||||
'@timestamp': {
|
||||
gte: `now-${interval}`,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
exists: {
|
||||
field: metric,
|
||||
},
|
||||
},
|
||||
],
|
||||
...parsedFilterQuery,
|
||||
},
|
||||
},
|
||||
size: 0,
|
||||
aggs,
|
||||
};
|
||||
|
||||
try {
|
||||
if (groupBy) {
|
||||
const bucketSelector = (
|
||||
response: InfraDatabaseSearchResponse<{}, CompositeAggregationsResponse>
|
||||
) => response.aggregations?.groupings?.buckets || [];
|
||||
const afterKeyHandler = createAfterKeyHandler(
|
||||
'aggs.groupings.composite.after',
|
||||
response => response.aggregations?.groupings?.after_key
|
||||
);
|
||||
const compositeBuckets = (await getAllCompositeData(
|
||||
body => callCluster('search', { body, index: indexPattern }),
|
||||
searchBody,
|
||||
bucketSelector,
|
||||
afterKeyHandler
|
||||
)) as Array<Aggregation & { key: { groupBy: string } }>;
|
||||
return compositeBuckets.reduce(
|
||||
(result, bucket) => ({
|
||||
...result,
|
||||
[bucket.key.groupBy]: getCurrentValueFromAggregations(bucket),
|
||||
}),
|
||||
{}
|
||||
);
|
||||
}
|
||||
const result = await callCluster('search', {
|
||||
body: searchBody,
|
||||
index: indexPattern,
|
||||
});
|
||||
return { '*': getCurrentValueFromAggregations(result.aggregations) };
|
||||
} catch (e) {
|
||||
return { '*': undefined }; // Trigger an Error state
|
||||
}
|
||||
};
|
||||
|
||||
const comparatorMap = {
|
||||
[Comparator.BETWEEN]: (value: number, [a, b]: number[]) =>
|
||||
value >= Math.min(a, b) && value <= Math.max(a, b),
|
||||
// `threshold` is always an array of numbers in case the BETWEEN comparator is
|
||||
// used; all other compartors will just destructure the first value in the array
|
||||
[Comparator.GT]: (a: number, [b]: number[]) => a > b,
|
||||
[Comparator.LT]: (a: number, [b]: number[]) => a < b,
|
||||
[Comparator.GT_OR_EQ]: (a: number, [b]: number[]) => a >= b,
|
||||
[Comparator.LT_OR_EQ]: (a: number, [b]: number[]) => a <= b,
|
||||
};
|
||||
import { PluginSetupContract } from '../../../../../alerting/server';
|
||||
import { createMetricThresholdExecutor, FIRED_ACTIONS } from './metric_threshold_executor';
|
||||
import { METRIC_THRESHOLD_ALERT_TYPE_ID } from './types';
|
||||
|
||||
export async function registerMetricThresholdAlertType(alertingPlugin: PluginSetupContract) {
|
||||
if (!alertingPlugin) {
|
||||
|
@ -217,59 +39,6 @@ export async function registerMetricThresholdAlertType(alertingPlugin: PluginSet
|
|||
},
|
||||
defaultActionGroupId: FIRED_ACTIONS.id,
|
||||
actionGroups: [FIRED_ACTIONS],
|
||||
async executor({ services, params }) {
|
||||
const { criteria, groupBy, filterQuery } = params as {
|
||||
criteria: MetricExpressionParams[];
|
||||
groupBy: string | undefined;
|
||||
filterQuery: string | undefined;
|
||||
};
|
||||
|
||||
const alertResults = await Promise.all(
|
||||
criteria.map(criterion =>
|
||||
(async () => {
|
||||
const currentValues = await getMetric(services, criterion, groupBy, filterQuery);
|
||||
const { threshold, comparator } = criterion;
|
||||
const comparisonFunction = comparatorMap[comparator];
|
||||
|
||||
return mapValues(currentValues, value => ({
|
||||
shouldFire:
|
||||
value !== undefined && value !== null && comparisonFunction(value, threshold),
|
||||
currentValue: value,
|
||||
isNoData: value === null,
|
||||
isError: value === undefined,
|
||||
}));
|
||||
})()
|
||||
)
|
||||
);
|
||||
|
||||
const groups = Object.keys(alertResults[0]);
|
||||
for (const group of groups) {
|
||||
const alertInstance = services.alertInstanceFactory(`${alertUUID}-${group}`);
|
||||
|
||||
// AND logic; all criteria must be across the threshold
|
||||
const shouldAlertFire = alertResults.every(result => result[group].shouldFire);
|
||||
// AND logic; because we need to evaluate all criteria, if one of them reports no data then the
|
||||
// whole alert is in a No Data/Error state
|
||||
const isNoData = alertResults.some(result => result[group].isNoData);
|
||||
const isError = alertResults.some(result => result[group].isError);
|
||||
if (shouldAlertFire) {
|
||||
alertInstance.scheduleActions(FIRED_ACTIONS.id, {
|
||||
group,
|
||||
value: alertResults.map(result => result[group].currentValue),
|
||||
});
|
||||
}
|
||||
|
||||
// Future use: ability to fetch display current alert state
|
||||
alertInstance.replaceState({
|
||||
alertState: isError
|
||||
? AlertStates.ERROR
|
||||
: isNoData
|
||||
? AlertStates.NO_DATA
|
||||
: shouldAlertFire
|
||||
? AlertStates.ALERT
|
||||
: AlertStates.OK,
|
||||
});
|
||||
}
|
||||
},
|
||||
executor: createMetricThresholdExecutor(alertUUID),
|
||||
});
|
||||
}
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
const bucketsA = [
|
||||
{
|
||||
doc_count: 2,
|
||||
aggregatedValue: { value: 0.5 },
|
||||
},
|
||||
{
|
||||
doc_count: 3,
|
||||
aggregatedValue: { value: 1.0 },
|
||||
},
|
||||
];
|
||||
|
||||
const bucketsB = [
|
||||
{
|
||||
doc_count: 4,
|
||||
aggregatedValue: { value: 2.5 },
|
||||
},
|
||||
{
|
||||
doc_count: 5,
|
||||
aggregatedValue: { value: 3.5 },
|
||||
},
|
||||
];
|
||||
|
||||
export const basicMetricResponse = {
|
||||
aggregations: {
|
||||
aggregatedIntervals: {
|
||||
buckets: bucketsA,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
export const alternateMetricResponse = {
|
||||
aggregations: {
|
||||
aggregatedIntervals: {
|
||||
buckets: bucketsB,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
export const basicCompositeResponse = {
|
||||
aggregations: {
|
||||
groupings: {
|
||||
after_key: 'foo',
|
||||
buckets: [
|
||||
{
|
||||
key: {
|
||||
groupBy: 'a',
|
||||
},
|
||||
aggregatedIntervals: {
|
||||
buckets: bucketsA,
|
||||
},
|
||||
},
|
||||
{
|
||||
key: {
|
||||
groupBy: 'b',
|
||||
},
|
||||
aggregatedIntervals: {
|
||||
buckets: bucketsB,
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
hits: {
|
||||
total: {
|
||||
value: 2,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
export const alternateCompositeResponse = {
|
||||
aggregations: {
|
||||
groupings: {
|
||||
after_key: 'foo',
|
||||
buckets: [
|
||||
{
|
||||
key: {
|
||||
groupBy: 'a',
|
||||
},
|
||||
aggregatedIntervals: {
|
||||
buckets: bucketsB,
|
||||
},
|
||||
},
|
||||
{
|
||||
key: {
|
||||
groupBy: 'b',
|
||||
},
|
||||
aggregatedIntervals: {
|
||||
buckets: bucketsA,
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
hits: {
|
||||
total: {
|
||||
value: 2,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
export const compositeEndResponse = {
|
||||
aggregations: {},
|
||||
hits: { total: { value: 0 } },
|
||||
};
|
|
@ -33,5 +33,4 @@ export interface MetricExpressionParams {
|
|||
indexPattern: string;
|
||||
threshold: number[];
|
||||
comparator: Comparator;
|
||||
filterQuery: string;
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ export default function({ loadTestFile }) {
|
|||
loadTestFile(require.resolve('./sources'));
|
||||
loadTestFile(require.resolve('./waffle'));
|
||||
loadTestFile(require.resolve('./log_item'));
|
||||
loadTestFile(require.resolve('./metrics_alerting'));
|
||||
loadTestFile(require.resolve('./metrics_explorer'));
|
||||
loadTestFile(require.resolve('./feature_controls'));
|
||||
loadTestFile(require.resolve('./ip_to_hostname'));
|
||||
|
|
98
x-pack/test/api_integration/apis/infra/metrics_alerting.ts
Normal file
98
x-pack/test/api_integration/apis/infra/metrics_alerting.ts
Normal file
|
@ -0,0 +1,98 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import expect from '@kbn/expect';
|
||||
import { getElasticsearchMetricQuery } from '../../../../plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor';
|
||||
import { MetricExpressionParams } from '../../../../plugins/infra/server/lib/alerting/metric_threshold/types';
|
||||
|
||||
import { FtrProviderContext } from '../../ftr_provider_context';
|
||||
|
||||
export default function({ getService }: FtrProviderContext) {
|
||||
const client = getService('legacyEs');
|
||||
const index = 'test-index';
|
||||
const baseParams = {
|
||||
metric: 'test.metric',
|
||||
timeUnit: 'm',
|
||||
timeSize: 5,
|
||||
};
|
||||
describe('Metrics Threshold Alerts', () => {
|
||||
before(async () => {
|
||||
await client.index({
|
||||
index,
|
||||
body: {},
|
||||
});
|
||||
});
|
||||
const aggs = ['avg', 'min', 'max', 'rate', 'cardinality', 'count'];
|
||||
|
||||
describe('querying the entire infrastructure', () => {
|
||||
for (const aggType of aggs) {
|
||||
it(`should work with the ${aggType} aggregator`, async () => {
|
||||
const searchBody = getElasticsearchMetricQuery({
|
||||
...baseParams,
|
||||
aggType,
|
||||
} as MetricExpressionParams);
|
||||
const result = await client.search({
|
||||
index,
|
||||
body: searchBody,
|
||||
});
|
||||
expect(result.error).to.not.be.ok();
|
||||
expect(result.hits).to.be.ok();
|
||||
});
|
||||
}
|
||||
it('should work with a filterQuery', async () => {
|
||||
const searchBody = getElasticsearchMetricQuery(
|
||||
{
|
||||
...baseParams,
|
||||
aggType: 'avg',
|
||||
} as MetricExpressionParams,
|
||||
undefined,
|
||||
'{"bool":{"should":[{"match_phrase":{"agent.hostname":"foo"}}],"minimum_should_match":1}}'
|
||||
);
|
||||
const result = await client.search({
|
||||
index,
|
||||
body: searchBody,
|
||||
});
|
||||
expect(result.error).to.not.be.ok();
|
||||
expect(result.hits).to.be.ok();
|
||||
});
|
||||
});
|
||||
describe('querying with a groupBy parameter', () => {
|
||||
for (const aggType of aggs) {
|
||||
it(`should work with the ${aggType} aggregator`, async () => {
|
||||
const searchBody = getElasticsearchMetricQuery(
|
||||
{
|
||||
...baseParams,
|
||||
aggType,
|
||||
} as MetricExpressionParams,
|
||||
'agent.id'
|
||||
);
|
||||
const result = await client.search({
|
||||
index,
|
||||
body: searchBody,
|
||||
});
|
||||
expect(result.error).to.not.be.ok();
|
||||
expect(result.hits).to.be.ok();
|
||||
});
|
||||
}
|
||||
it('should work with a filterQuery', async () => {
|
||||
const searchBody = getElasticsearchMetricQuery(
|
||||
{
|
||||
...baseParams,
|
||||
aggType: 'avg',
|
||||
} as MetricExpressionParams,
|
||||
'agent.id',
|
||||
'{"bool":{"should":[{"match_phrase":{"agent.hostname":"foo"}}],"minimum_should_match":1}}'
|
||||
);
|
||||
const result = await client.search({
|
||||
index,
|
||||
body: searchBody,
|
||||
});
|
||||
expect(result.error).to.not.be.ok();
|
||||
expect(result.hits).to.be.ok();
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue