[RAC] Rule registry plugin (#95903)

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Dario Gieselaar 2021-04-09 10:35:44 +02:00 committed by GitHub
parent 012f1c199b
commit dfaf3ac8f5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
59 changed files with 9844 additions and 1336 deletions

View file

@ -485,6 +485,10 @@ Elastic.
|Welcome to the Kibana rollup plugin! This plugin provides Kibana support for Elasticsearch's rollup feature. Please refer to the Elasticsearch documentation to understand rollup indices and how to create rollup jobs.
|{kib-repo}blob/{branch}/x-pack/plugins/rule_registry/README.md[ruleRegistry]
|The rule registry plugin aims to make it easy for rule type producers to have their rules produce the data that they need to build rich experiences on top of a unified experience, without the risk of mapping conflicts.
|{kib-repo}blob/{branch}/x-pack/plugins/runtime_fields/README.md[runtimeFields]
|Welcome to the home of the runtime field editor and everything related to runtime fields!

View file

@ -370,6 +370,16 @@ export type AggregateOf<
missing: {
doc_count: number;
} & SubAggregateOf<TAggregationContainer, TDocument>;
multi_terms: {
doc_count_error_upper_bound: number;
sum_other_doc_count: number;
buckets: Array<
{
doc_count: number;
key: string[];
} & SubAggregateOf<TAggregationContainer, TDocument>
>;
};
nested: {
doc_count: number;
} & SubAggregateOf<TAggregationContainer, TDocument>;

View file

@ -22,11 +22,13 @@ const environmentLabels: Record<string, string> = {
};
export const ENVIRONMENT_ALL = {
esFieldValue: undefined,
value: ENVIRONMENT_ALL_VALUE,
text: environmentLabels[ENVIRONMENT_ALL_VALUE],
};
export const ENVIRONMENT_NOT_DEFINED = {
esFieldValue: undefined,
value: ENVIRONMENT_NOT_DEFINED_VALUE,
text: environmentLabels[ENVIRONMENT_NOT_DEFINED_VALUE],
};
@ -35,6 +37,22 @@ export function getEnvironmentLabel(environment: string) {
return environmentLabels[environment] || environment;
}
export function parseEnvironmentUrlParam(environment: string) {
if (environment === ENVIRONMENT_ALL_VALUE) {
return ENVIRONMENT_ALL;
}
if (environment === ENVIRONMENT_NOT_DEFINED_VALUE) {
return ENVIRONMENT_NOT_DEFINED;
}
return {
esFieldValue: environment,
value: environment,
text: environment,
};
}
// returns the environment url param that should be used
// based on the requested environment. If the requested
// environment is different from the URL parameter, we'll

View file

@ -9,7 +9,8 @@
"licensing",
"triggersActionsUi",
"embeddable",
"infra"
"infra",
"observability"
],
"optionalPlugins": [
"spaces",
@ -18,7 +19,6 @@
"taskManager",
"actions",
"alerting",
"observability",
"security",
"ml",
"home",

View file

@ -2,6 +2,7 @@
"include": [
"./x-pack/plugins/apm/**/*",
"./x-pack/plugins/observability/**/*",
"./x-pack/plugins/rule_registry/**/*",
"./typings/**/*"
],
"exclude": [

View file

@ -13,28 +13,28 @@ export const apmActionVariables = {
'xpack.apm.alerts.action_variables.serviceName',
{ defaultMessage: 'The service the alert is created for' }
),
name: 'serviceName',
name: 'serviceName' as const,
},
transactionType: {
description: i18n.translate(
'xpack.apm.alerts.action_variables.transactionType',
{ defaultMessage: 'The transaction type the alert is created for' }
),
name: 'transactionType',
name: 'transactionType' as const,
},
environment: {
description: i18n.translate(
'xpack.apm.alerts.action_variables.environment',
{ defaultMessage: 'The transaction type the alert is created for' }
),
name: 'environment',
name: 'environment' as const,
},
threshold: {
description: i18n.translate('xpack.apm.alerts.action_variables.threshold', {
defaultMessage:
'Any trigger value above this value will cause the alert to fire',
}),
name: 'threshold',
name: 'threshold' as const,
},
triggerValue: {
description: i18n.translate(
@ -44,7 +44,7 @@ export const apmActionVariables = {
'The value that breached the threshold and triggered the alert',
}
),
name: 'triggerValue',
name: 'triggerValue' as const,
},
interval: {
description: i18n.translate(
@ -54,6 +54,6 @@ export const apmActionVariables = {
'The length and unit of the time period where the alert conditions were met',
}
),
name: 'interval',
name: 'interval' as const,
},
};

View file

@ -5,28 +5,24 @@
* 2.0.
*/
import { ApiResponse } from '@elastic/elasticsearch';
import { ThresholdMetActionGroupId } from '../../../common/alert_types';
import {
ESSearchRequest,
ESSearchResponse,
} from '../../../../../../typings/elasticsearch';
import {
AlertInstanceContext,
AlertInstanceState,
AlertServices,
} from '../../../../alerting/server';
import { AlertServices } from '../../../../alerting/server';
export function alertingEsClient<TParams extends ESSearchRequest>(
services: AlertServices<
AlertInstanceState,
AlertInstanceContext,
ThresholdMetActionGroupId
>,
export async function alertingEsClient<TParams extends ESSearchRequest>(
scopedClusterClient: AlertServices<
never,
never,
never
>['scopedClusterClient'],
params: TParams
): Promise<ApiResponse<ESSearchResponse<unknown, TParams>>> {
return (services.scopedClusterClient.asCurrentUser.search({
): Promise<ESSearchResponse<unknown, TParams>> {
const response = await scopedClusterClient.asCurrentUser.search({
...params,
ignore_unavailable: true,
}) as unknown) as Promise<ApiResponse<ESSearchResponse<unknown, TParams>>>;
});
return (response.body as unknown) as ESSearchResponse<unknown, TParams>;
}

View file

@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { createLifecycleRuleTypeFactory } from '../../../../rule_registry/server';
import { APMRuleRegistry } from '../../plugin';
export const createAPMLifecycleRuleType = createLifecycleRuleTypeFactory<APMRuleRegistry>();

View file

@ -6,38 +6,25 @@
*/
import { Observable } from 'rxjs';
import { AlertingPlugin } from '../../../../alerting/server';
import { ActionsPlugin } from '../../../../actions/server';
import { Logger } from 'kibana/server';
import { registerTransactionDurationAlertType } from './register_transaction_duration_alert_type';
import { registerTransactionDurationAnomalyAlertType } from './register_transaction_duration_anomaly_alert_type';
import { registerErrorCountAlertType } from './register_error_count_alert_type';
import { APMConfig } from '../..';
import { MlPluginSetup } from '../../../../ml/server';
import { registerTransactionErrorRateAlertType } from './register_transaction_error_rate_alert_type';
import { APMRuleRegistry } from '../../plugin';
interface Params {
alerting: AlertingPlugin['setup'];
actions: ActionsPlugin['setup'];
export interface RegisterRuleDependencies {
registry: APMRuleRegistry;
ml?: MlPluginSetup;
config$: Observable<APMConfig>;
logger: Logger;
}
export function registerApmAlerts(params: Params) {
registerTransactionDurationAlertType({
alerting: params.alerting,
config$: params.config$,
});
registerTransactionDurationAnomalyAlertType({
alerting: params.alerting,
ml: params.ml,
config$: params.config$,
});
registerErrorCountAlertType({
alerting: params.alerting,
config$: params.config$,
});
registerTransactionErrorRateAlertType({
alerting: params.alerting,
config$: params.config$,
});
export function registerApmAlerts(dependencies: RegisterRuleDependencies) {
registerTransactionDurationAlertType(dependencies);
registerTransactionDurationAnomalyAlertType(dependencies);
registerErrorCountAlertType(dependencies);
registerTransactionErrorRateAlertType(dependencies);
}

View file

@ -5,50 +5,17 @@
* 2.0.
*/
import { Observable } from 'rxjs';
import * as Rx from 'rxjs';
import { toArray, map } from 'rxjs/operators';
import { AlertingPlugin } from '../../../../alerting/server';
import { APMConfig } from '../..';
import { registerErrorCountAlertType } from './register_error_count_alert_type';
import { elasticsearchServiceMock } from 'src/core/server/mocks';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { elasticsearchClientMock } from 'src/core/server/elasticsearch/client/mocks';
type Operator<T1, T2> = (source: Rx.Observable<T1>) => Rx.Observable<T2>;
const pipeClosure = <T1, T2>(fn: Operator<T1, T2>): Operator<T1, T2> => {
return (source: Rx.Observable<T1>) => {
return Rx.defer(() => fn(source));
};
};
const mockedConfig$ = (Rx.of('apm_oss.errorIndices').pipe(
pipeClosure((source$) => {
return source$.pipe(map((i) => i));
}),
toArray()
) as unknown) as Observable<APMConfig>;
import { createRuleTypeMocks } from './test_utils';
describe('Error count alert', () => {
it("doesn't send an alert when error count is less than threshold", async () => {
let alertExecutor: any;
const alerting = {
registerType: ({ executor }) => {
alertExecutor = executor;
},
} as AlertingPlugin['setup'];
const { services, dependencies, executor } = createRuleTypeMocks();
registerErrorCountAlertType({
alerting,
config$: mockedConfig$,
});
expect(alertExecutor).toBeDefined();
registerErrorCountAlertType(dependencies);
const services = {
scopedClusterClient: elasticsearchServiceMock.createScopedClusterClient(),
alertInstanceFactory: jest.fn(),
};
const params = { threshold: 1 };
services.scopedClusterClient.asCurrentUser.search.mockReturnValue(
@ -71,30 +38,21 @@ describe('Error count alert', () => {
})
);
await alertExecutor!({ services, params });
await executor({ params });
expect(services.alertInstanceFactory).not.toBeCalled();
});
it('sends alerts with service name and environment', async () => {
let alertExecutor: any;
const alerting = {
registerType: ({ executor }) => {
alertExecutor = executor;
},
} as AlertingPlugin['setup'];
it('sends alerts with service name and environment for those that exceeded the threshold', async () => {
const {
services,
dependencies,
executor,
scheduleActions,
} = createRuleTypeMocks();
registerErrorCountAlertType({
alerting,
config$: mockedConfig$,
});
expect(alertExecutor).toBeDefined();
registerErrorCountAlertType(dependencies);
const scheduleActions = jest.fn();
const services = {
scopedClusterClient: elasticsearchServiceMock.createScopedClusterClient(),
alertInstanceFactory: jest.fn(() => ({ scheduleActions })),
};
const params = { threshold: 1, windowSize: 5, windowUnit: 'm' };
const params = { threshold: 2, windowSize: 5, windowUnit: 'm' };
services.scopedClusterClient.asCurrentUser.search.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
@ -106,18 +64,62 @@ describe('Error count alert', () => {
},
},
aggregations: {
services: {
error_counts: {
buckets: [
{
key: 'foo',
environments: {
buckets: [{ key: 'env-foo' }, { key: 'env-foo-2' }],
key: ['foo', 'env-foo'],
doc_count: 5,
latest: {
top: [
{
metrics: {
'service.name': 'foo',
'service.environment': 'env-foo',
},
},
],
},
},
{
key: 'bar',
environments: {
buckets: [{ key: 'env-bar' }, { key: 'env-bar-2' }],
key: ['foo', 'env-foo-2'],
doc_count: 4,
latest: {
top: [
{
metrics: {
'service.name': 'foo',
'service.environment': 'env-foo-2',
},
},
],
},
},
{
key: ['bar', 'env-bar'],
doc_count: 3,
latest: {
top: [
{
metrics: {
'service.name': 'bar',
'service.environment': 'env-bar',
},
},
],
},
},
{
key: ['bar', 'env-bar-2'],
doc_count: 1,
latest: {
top: [
{
metrics: {
'service.name': 'bar',
'service.environment': 'env-bar-2',
},
},
],
},
},
],
@ -134,115 +136,36 @@ describe('Error count alert', () => {
})
);
await alertExecutor!({ services, params });
await executor({ params });
[
'apm.error_rate_foo_env-foo',
'apm.error_rate_foo_env-foo-2',
'apm.error_rate_bar_env-bar',
'apm.error_rate_bar_env-bar-2',
].forEach((instanceName) =>
expect(services.alertInstanceFactory).toHaveBeenCalledWith(instanceName)
);
expect(scheduleActions).toHaveBeenCalledTimes(3);
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'foo',
environment: 'env-foo',
threshold: 1,
triggerValue: 2,
threshold: 2,
triggerValue: 5,
interval: '5m',
});
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'foo',
environment: 'env-foo-2',
threshold: 1,
triggerValue: 2,
threshold: 2,
triggerValue: 4,
interval: '5m',
});
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'bar',
environment: 'env-bar',
threshold: 1,
triggerValue: 2,
interval: '5m',
});
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'bar',
environment: 'env-bar-2',
threshold: 1,
triggerValue: 2,
interval: '5m',
});
});
it('sends alerts with service name', async () => {
let alertExecutor: any;
const alerting = {
registerType: ({ executor }) => {
alertExecutor = executor;
},
} as AlertingPlugin['setup'];
registerErrorCountAlertType({
alerting,
config$: mockedConfig$,
});
expect(alertExecutor).toBeDefined();
const scheduleActions = jest.fn();
const services = {
scopedClusterClient: elasticsearchServiceMock.createScopedClusterClient(),
alertInstanceFactory: jest.fn(() => ({ scheduleActions })),
};
const params = { threshold: 1, windowSize: 5, windowUnit: 'm' };
services.scopedClusterClient.asCurrentUser.search.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
hits: {
hits: [],
total: {
relation: 'eq',
value: 2,
},
},
aggregations: {
services: {
buckets: [
{
key: 'foo',
},
{
key: 'bar',
},
],
},
},
took: 0,
timed_out: false,
_shards: {
failed: 0,
skipped: 0,
successful: 1,
total: 1,
},
})
);
await alertExecutor!({ services, params });
['apm.error_rate_foo', 'apm.error_rate_bar'].forEach((instanceName) =>
expect(services.alertInstanceFactory).toHaveBeenCalledWith(instanceName)
);
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'foo',
environment: undefined,
threshold: 1,
triggerValue: 2,
interval: '5m',
});
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'bar',
environment: undefined,
threshold: 1,
triggerValue: 2,
threshold: 2,
triggerValue: 3,
interval: '5m',
});
});

View file

@ -5,22 +5,11 @@
* 2.0.
*/
import { schema, TypeOf } from '@kbn/config-schema';
import { isEmpty } from 'lodash';
import { Observable } from 'rxjs';
import { schema } from '@kbn/config-schema';
import { take } from 'rxjs/operators';
import { APMConfig } from '../..';
import {
AlertingPlugin,
AlertInstanceContext,
AlertInstanceState,
AlertTypeState,
} from '../../../../alerting/server';
import {
AlertType,
ALERT_TYPES_CONFIG,
ThresholdMetActionGroupId,
} from '../../../common/alert_types';
import { ENVIRONMENT_NOT_DEFINED } from '../../../common/environment_filter_values';
import { asMutableArray } from '../../../common/utils/as_mutable_array';
import { AlertType, ALERT_TYPES_CONFIG } from '../../../common/alert_types';
import {
PROCESSOR_EVENT,
SERVICE_ENVIRONMENT,
@ -31,11 +20,8 @@ import { environmentQuery } from '../../../server/utils/queries';
import { getApmIndices } from '../settings/apm_indices/get_apm_indices';
import { apmActionVariables } from './action_variables';
import { alertingEsClient } from './alerting_es_client';
interface RegisterAlertParams {
alerting: AlertingPlugin['setup'];
config$: Observable<APMConfig>;
}
import { RegisterRuleDependencies } from './register_apm_alerts';
import { createAPMLifecycleRuleType } from './create_apm_lifecycle_rule_type';
const paramsSchema = schema.object({
windowSize: schema.number(),
@ -48,127 +34,131 @@ const paramsSchema = schema.object({
const alertTypeConfig = ALERT_TYPES_CONFIG[AlertType.ErrorCount];
export function registerErrorCountAlertType({
alerting,
registry,
config$,
}: RegisterAlertParams) {
alerting.registerType<
TypeOf<typeof paramsSchema>,
AlertTypeState,
AlertInstanceState,
AlertInstanceContext,
ThresholdMetActionGroupId
>({
id: AlertType.ErrorCount,
name: alertTypeConfig.name,
actionGroups: alertTypeConfig.actionGroups,
defaultActionGroupId: alertTypeConfig.defaultActionGroupId,
validate: {
params: paramsSchema,
},
actionVariables: {
context: [
apmActionVariables.serviceName,
apmActionVariables.environment,
apmActionVariables.threshold,
apmActionVariables.triggerValue,
apmActionVariables.interval,
],
},
producer: 'apm',
minimumLicenseRequired: 'basic',
executor: async ({ services, params }) => {
const config = await config$.pipe(take(1)).toPromise();
const alertParams = params;
const indices = await getApmIndices({
config,
savedObjectsClient: services.savedObjectsClient,
});
const maxServiceEnvironments = config['xpack.apm.maxServiceEnvironments'];
}: RegisterRuleDependencies) {
registry.registerType(
createAPMLifecycleRuleType({
id: AlertType.ErrorCount,
name: alertTypeConfig.name,
actionGroups: alertTypeConfig.actionGroups,
defaultActionGroupId: alertTypeConfig.defaultActionGroupId,
validate: {
params: paramsSchema,
},
actionVariables: {
context: [
apmActionVariables.serviceName,
apmActionVariables.environment,
apmActionVariables.threshold,
apmActionVariables.triggerValue,
apmActionVariables.interval,
],
},
producer: 'apm',
minimumLicenseRequired: 'basic',
executor: async ({ services, params }) => {
const config = await config$.pipe(take(1)).toPromise();
const alertParams = params;
const indices = await getApmIndices({
config,
savedObjectsClient: services.savedObjectsClient,
});
const searchParams = {
index: indices['apm_oss.errorIndices'],
size: 0,
body: {
track_total_hits: true,
query: {
bool: {
filter: [
{
range: {
'@timestamp': {
gte: `now-${alertParams.windowSize}${alertParams.windowUnit}`,
const searchParams = {
index: indices['apm_oss.errorIndices'],
size: 0,
body: {
query: {
bool: {
filter: [
{
range: {
'@timestamp': {
gte: `now-${alertParams.windowSize}${alertParams.windowUnit}`,
},
},
},
{ term: { [PROCESSOR_EVENT]: ProcessorEvent.error } },
...(alertParams.serviceName
? [{ term: { [SERVICE_NAME]: alertParams.serviceName } }]
: []),
...environmentQuery(alertParams.environment),
],
},
},
aggs: {
error_counts: {
multi_terms: {
terms: [
{ field: SERVICE_NAME },
{ field: SERVICE_ENVIRONMENT, missing: '' },
],
size: 10000,
},
aggs: {
latest: {
top_metrics: {
metrics: asMutableArray([
{ field: SERVICE_NAME },
{ field: SERVICE_ENVIRONMENT },
] as const),
sort: {
'@timestamp': 'desc' as const,
},
},
},
},
{ term: { [PROCESSOR_EVENT]: ProcessorEvent.error } },
...(alertParams.serviceName
? [{ term: { [SERVICE_NAME]: alertParams.serviceName } }]
: []),
...environmentQuery(alertParams.environment),
],
},
},
},
aggs: {
services: {
terms: {
field: SERVICE_NAME,
size: 50,
},
aggs: {
environments: {
terms: {
field: SERVICE_ENVIRONMENT,
size: maxServiceEnvironments,
},
};
const response = await alertingEsClient(
services.scopedClusterClient,
searchParams
);
const errorCountResults =
response.aggregations?.error_counts.buckets.map((bucket) => {
const latest = bucket.latest.top[0].metrics;
return {
serviceName: latest['service.name'] as string,
environment: latest['service.environment'] as string | undefined,
errorCount: bucket.doc_count,
};
}) ?? [];
errorCountResults
.filter((result) => result.errorCount >= alertParams.threshold)
.forEach((result) => {
const { serviceName, environment, errorCount } = result;
services
.alertWithLifecycle({
id: [AlertType.ErrorCount, serviceName, environment]
.filter((name) => name)
.join('_'),
fields: {
[SERVICE_NAME]: serviceName,
...(environment
? { [SERVICE_ENVIRONMENT]: environment }
: {}),
[PROCESSOR_EVENT]: 'error',
},
},
},
},
},
};
const { body: response } = await alertingEsClient(services, searchParams);
const errorCount = response.hits.total.value;
if (errorCount > alertParams.threshold) {
function scheduleAction({
serviceName,
environment,
}: {
serviceName: string;
environment?: string;
}) {
const alertInstanceName = [
AlertType.ErrorCount,
serviceName,
environment,
]
.filter((name) => name)
.join('_');
const alertInstance = services.alertInstanceFactory(
alertInstanceName
);
alertInstance.scheduleActions(alertTypeConfig.defaultActionGroupId, {
serviceName,
environment,
threshold: alertParams.threshold,
triggerValue: errorCount,
interval: `${alertParams.windowSize}${alertParams.windowUnit}`,
})
.scheduleActions(alertTypeConfig.defaultActionGroupId, {
serviceName,
environment: environment || ENVIRONMENT_NOT_DEFINED.text,
threshold: alertParams.threshold,
triggerValue: errorCount,
interval: `${alertParams.windowSize}${alertParams.windowUnit}`,
});
});
}
response.aggregations?.services.buckets.forEach((serviceBucket) => {
const serviceName = serviceBucket.key as string;
if (isEmpty(serviceBucket.environments?.buckets)) {
scheduleAction({ serviceName });
} else {
serviceBucket.environments.buckets.forEach((envBucket) => {
const environment = envBucket.key as string;
scheduleAction({ serviceName, environment });
});
}
});
}
},
});
return {};
},
})
);
}

View file

@ -6,10 +6,9 @@
*/
import { schema } from '@kbn/config-schema';
import { Observable } from 'rxjs';
import { take } from 'rxjs/operators';
import { APMConfig } from '../..';
import { AlertingPlugin } from '../../../../alerting/server';
import { QueryContainer } from '@elastic/elasticsearch/api/types';
import { parseEnvironmentUrlParam } from '../../../common/environment_filter_values';
import { AlertType, ALERT_TYPES_CONFIG } from '../../../common/alert_types';
import {
PROCESSOR_EVENT,
@ -24,11 +23,8 @@ import { environmentQuery } from '../../../server/utils/queries';
import { getApmIndices } from '../settings/apm_indices/get_apm_indices';
import { apmActionVariables } from './action_variables';
import { alertingEsClient } from './alerting_es_client';
interface RegisterAlertParams {
alerting: AlertingPlugin['setup'];
config$: Observable<APMConfig>;
}
import { RegisterRuleDependencies } from './register_apm_alerts';
import { createAPMLifecycleRuleType } from './create_apm_lifecycle_rule_type';
const paramsSchema = schema.object({
serviceName: schema.string(),
@ -47,116 +43,126 @@ const paramsSchema = schema.object({
const alertTypeConfig = ALERT_TYPES_CONFIG[AlertType.TransactionDuration];
export function registerTransactionDurationAlertType({
alerting,
registry,
config$,
}: RegisterAlertParams) {
alerting.registerType({
id: AlertType.TransactionDuration,
name: alertTypeConfig.name,
actionGroups: alertTypeConfig.actionGroups,
defaultActionGroupId: alertTypeConfig.defaultActionGroupId,
validate: {
params: paramsSchema,
},
actionVariables: {
context: [
apmActionVariables.serviceName,
apmActionVariables.transactionType,
apmActionVariables.environment,
apmActionVariables.threshold,
apmActionVariables.triggerValue,
apmActionVariables.interval,
],
},
producer: 'apm',
minimumLicenseRequired: 'basic',
executor: async ({ services, params }) => {
const config = await config$.pipe(take(1)).toPromise();
const alertParams = params;
const indices = await getApmIndices({
config,
savedObjectsClient: services.savedObjectsClient,
});
const maxServiceEnvironments = config['xpack.apm.maxServiceEnvironments'];
}: RegisterRuleDependencies) {
registry.registerType(
createAPMLifecycleRuleType({
id: AlertType.TransactionDuration,
name: alertTypeConfig.name,
actionGroups: alertTypeConfig.actionGroups,
defaultActionGroupId: alertTypeConfig.defaultActionGroupId,
validate: {
params: paramsSchema,
},
actionVariables: {
context: [
apmActionVariables.serviceName,
apmActionVariables.transactionType,
apmActionVariables.environment,
apmActionVariables.threshold,
apmActionVariables.triggerValue,
apmActionVariables.interval,
],
},
producer: 'apm',
minimumLicenseRequired: 'basic',
executor: async ({ services, params }) => {
const config = await config$.pipe(take(1)).toPromise();
const alertParams = params;
const indices = await getApmIndices({
config,
savedObjectsClient: services.savedObjectsClient,
});
const searchParams = {
index: indices['apm_oss.transactionIndices'],
size: 0,
body: {
query: {
bool: {
filter: [
{
range: {
'@timestamp': {
gte: `now-${alertParams.windowSize}${alertParams.windowUnit}`,
const searchParams = {
index: indices['apm_oss.transactionIndices'],
size: 0,
body: {
query: {
bool: {
filter: [
{
range: {
'@timestamp': {
gte: `now-${alertParams.windowSize}${alertParams.windowUnit}`,
},
},
},
},
{ term: { [PROCESSOR_EVENT]: ProcessorEvent.transaction } },
{ term: { [SERVICE_NAME]: alertParams.serviceName } },
{ term: { [TRANSACTION_TYPE]: alertParams.transactionType } },
...environmentQuery(alertParams.environment),
],
},
},
aggs: {
agg:
alertParams.aggregationType === 'avg'
? { avg: { field: TRANSACTION_DURATION } }
: {
percentiles: {
field: TRANSACTION_DURATION,
percents: [
alertParams.aggregationType === '95th' ? 95 : 99,
],
},
},
environments: {
terms: {
field: SERVICE_ENVIRONMENT,
size: maxServiceEnvironments,
{ term: { [PROCESSOR_EVENT]: ProcessorEvent.transaction } },
{ term: { [SERVICE_NAME]: alertParams.serviceName } },
{ term: { [TRANSACTION_TYPE]: alertParams.transactionType } },
...environmentQuery(alertParams.environment),
] as QueryContainer[],
},
},
aggs: {
latency:
alertParams.aggregationType === 'avg'
? { avg: { field: TRANSACTION_DURATION } }
: {
percentiles: {
field: TRANSACTION_DURATION,
percents: [
alertParams.aggregationType === '95th' ? 95 : 99,
],
},
},
},
},
},
};
};
const { body: response } = await alertingEsClient(services, searchParams);
const response = await alertingEsClient(
services.scopedClusterClient,
searchParams
);
if (!response.aggregations) {
return;
}
if (!response.aggregations) {
return {};
}
const { agg, environments } = response.aggregations;
const { latency } = response.aggregations;
const transactionDuration =
'values' in agg ? Object.values(agg.values)[0] : agg?.value;
const transactionDuration =
'values' in latency
? Object.values(latency.values)[0]
: latency?.value;
const threshold = alertParams.threshold * 1000;
const threshold = alertParams.threshold * 1000;
if (transactionDuration && transactionDuration > threshold) {
const durationFormatter = getDurationFormatter(transactionDuration);
const transactionDurationFormatted = durationFormatter(
transactionDuration
).formatted;
if (transactionDuration && transactionDuration > threshold) {
const durationFormatter = getDurationFormatter(transactionDuration);
const transactionDurationFormatted = durationFormatter(
transactionDuration
).formatted;
environments.buckets.map((bucket) => {
const environment = bucket.key;
const alertInstance = services.alertInstanceFactory(
`${AlertType.TransactionDuration}_${environment}`
const environmentParsed = parseEnvironmentUrlParam(
alertParams.environment
);
alertInstance.scheduleActions(alertTypeConfig.defaultActionGroupId, {
transactionType: alertParams.transactionType,
serviceName: alertParams.serviceName,
environment,
threshold,
triggerValue: transactionDurationFormatted,
interval: `${alertParams.windowSize}${alertParams.windowUnit}`,
});
});
}
},
});
services
.alertWithLifecycle({
id: `${AlertType.TransactionDuration}_${environmentParsed.text}`,
fields: {
[SERVICE_NAME]: alertParams.serviceName,
...(environmentParsed.esFieldValue
? { [SERVICE_ENVIRONMENT]: environmentParsed.esFieldValue }
: {}),
[TRANSACTION_TYPE]: alertParams.transactionType,
},
})
.scheduleActions(alertTypeConfig.defaultActionGroupId, {
transactionType: alertParams.transactionType,
serviceName: alertParams.serviceName,
environment: environmentParsed.text,
threshold,
triggerValue: transactionDurationFormatted,
interval: `${alertParams.windowSize}${alertParams.windowUnit}`,
});
}
return {};
},
})
);
}

View file

@ -4,29 +4,11 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Observable } from 'rxjs';
import * as Rx from 'rxjs';
import { toArray, map } from 'rxjs/operators';
import { AlertingPlugin } from '../../../../alerting/server';
import { registerTransactionDurationAnomalyAlertType } from './register_transaction_duration_anomaly_alert_type';
import { APMConfig } from '../..';
import { ANOMALY_SEVERITY } from '../../../../ml/common';
import { Job, MlPluginSetup } from '../../../../ml/server';
import * as GetServiceAnomalies from '../service_map/get_service_anomalies';
type Operator<T1, T2> = (source: Rx.Observable<T1>) => Rx.Observable<T2>;
const pipeClosure = <T1, T2>(fn: Operator<T1, T2>): Operator<T1, T2> => {
return (source: Rx.Observable<T1>) => {
return Rx.defer(() => fn(source));
};
};
const mockedConfig$ = (Rx.of('apm_oss.errorIndices').pipe(
pipeClosure((source$) => {
return source$.pipe(map((i) => i));
}),
toArray()
) as unknown) as Observable<APMConfig>;
import { createRuleTypeMocks } from './test_utils';
describe('Transaction duration anomaly alert', () => {
afterEach(() => {
@ -34,28 +16,21 @@ describe('Transaction duration anomaly alert', () => {
});
describe("doesn't send alert", () => {
it('ml is not defined', async () => {
let alertExecutor: any;
const alerting = {
registerType: ({ executor }) => {
alertExecutor = executor;
},
} as AlertingPlugin['setup'];
const { services, dependencies, executor } = createRuleTypeMocks();
registerTransactionDurationAnomalyAlertType({
alerting,
...dependencies,
ml: undefined,
config$: mockedConfig$,
});
expect(alertExecutor).toBeDefined();
const services = {
callCluster: jest.fn(),
alertInstanceFactory: jest.fn(),
};
const params = { anomalySeverityType: ANOMALY_SEVERITY.MINOR };
await alertExecutor!({ services, params });
expect(services.callCluster).not.toHaveBeenCalled();
await executor({ params });
expect(
services.scopedClusterClient.asCurrentUser.search
).not.toHaveBeenCalled();
expect(services.alertInstanceFactory).not.toHaveBeenCalled();
});
@ -64,13 +39,7 @@ describe('Transaction duration anomaly alert', () => {
.spyOn(GetServiceAnomalies, 'getMLJobs')
.mockReturnValue(Promise.resolve([]));
let alertExecutor: any;
const alerting = {
registerType: ({ executor }) => {
alertExecutor = executor;
},
} as AlertingPlugin['setup'];
const { services, dependencies, executor } = createRuleTypeMocks();
const ml = ({
mlSystemProvider: () => ({ mlAnomalySearch: jest.fn() }),
@ -78,117 +47,127 @@ describe('Transaction duration anomaly alert', () => {
} as unknown) as MlPluginSetup;
registerTransactionDurationAnomalyAlertType({
alerting,
...dependencies,
ml,
config$: mockedConfig$,
});
expect(alertExecutor).toBeDefined();
const services = {
callCluster: jest.fn(),
alertInstanceFactory: jest.fn(),
};
const params = { anomalySeverityType: ANOMALY_SEVERITY.MINOR };
await alertExecutor!({ services, params });
expect(services.callCluster).not.toHaveBeenCalled();
await executor({ params });
expect(
services.scopedClusterClient.asCurrentUser.search
).not.toHaveBeenCalled();
expect(services.alertInstanceFactory).not.toHaveBeenCalled();
});
it('anomaly is less than threshold', async () => {
jest
.spyOn(GetServiceAnomalies, 'getMLJobs')
.mockReturnValue(
Promise.resolve([{ job_id: '1' }, { job_id: '2' }] as Job[])
);
jest.spyOn(GetServiceAnomalies, 'getMLJobs').mockReturnValue(
Promise.resolve(([
{
job_id: '1',
custom_settings: { job_tags: { environment: 'development' } },
},
{
job_id: '2',
custom_settings: { job_tags: { environment: 'production' } },
},
] as unknown) as Job[])
);
let alertExecutor: any;
const alerting = {
registerType: ({ executor }) => {
alertExecutor = executor;
},
} as AlertingPlugin['setup'];
const { services, dependencies, executor } = createRuleTypeMocks();
const ml = ({
mlSystemProvider: () => ({
mlAnomalySearch: () => ({
hits: { total: { value: 0 } },
aggregations: {
anomaly_groups: {
buckets: [
{
doc_count: 1,
latest_score: {
top: [{ metrics: { record_score: 0, job_id: '1' } }],
},
},
],
},
},
}),
}),
anomalyDetectorsProvider: jest.fn(),
} as unknown) as MlPluginSetup;
registerTransactionDurationAnomalyAlertType({
alerting,
...dependencies,
ml,
config$: mockedConfig$,
});
expect(alertExecutor).toBeDefined();
const services = {
callCluster: jest.fn(),
alertInstanceFactory: jest.fn(),
};
const params = { anomalySeverityType: ANOMALY_SEVERITY.MINOR };
await alertExecutor!({ services, params });
expect(services.callCluster).not.toHaveBeenCalled();
await executor({ params });
expect(
services.scopedClusterClient.asCurrentUser.search
).not.toHaveBeenCalled();
expect(services.alertInstanceFactory).not.toHaveBeenCalled();
});
});
describe('sends alert', () => {
it('with service name, environment and transaction type', async () => {
it('for all services that exceeded the threshold', async () => {
jest.spyOn(GetServiceAnomalies, 'getMLJobs').mockReturnValue(
Promise.resolve([
Promise.resolve(([
{
job_id: '1',
custom_settings: {
job_tags: {
environment: 'production',
},
},
} as unknown,
custom_settings: { job_tags: { environment: 'development' } },
},
{
job_id: '2',
custom_settings: {
job_tags: {
environment: 'production',
},
},
} as unknown,
] as Job[])
custom_settings: { job_tags: { environment: 'production' } },
},
] as unknown) as Job[])
);
let alertExecutor: any;
const alerting = {
registerType: ({ executor }) => {
alertExecutor = executor;
},
} as AlertingPlugin['setup'];
const {
services,
dependencies,
executor,
scheduleActions,
} = createRuleTypeMocks();
const ml = ({
mlSystemProvider: () => ({
mlAnomalySearch: () => ({
hits: { total: { value: 2 } },
aggregations: {
services: {
anomaly_groups: {
buckets: [
{
key: 'foo',
transaction_types: {
buckets: [{ key: 'type-foo' }],
latest_score: {
top: [
{
metrics: {
record_score: 80,
job_id: '1',
partition_field_value: 'foo',
by_field_value: 'type-foo',
},
},
],
},
record_avg: { value: 80 },
},
{
key: 'bar',
transaction_types: {
buckets: [{ key: 'type-bar' }],
latest_score: {
top: [
{
metrics: {
record_score: 20,
job_id: '2',
parttition_field_value: 'bar',
by_field_value: 'type-bar',
},
},
],
},
record_avg: { value: 20 },
},
],
},
@ -199,145 +178,26 @@ describe('Transaction duration anomaly alert', () => {
} as unknown) as MlPluginSetup;
registerTransactionDurationAnomalyAlertType({
alerting,
...dependencies,
ml,
config$: mockedConfig$,
});
expect(alertExecutor).toBeDefined();
const scheduleActions = jest.fn();
const services = {
callCluster: jest.fn(),
alertInstanceFactory: jest.fn(() => ({ scheduleActions })),
};
const params = { anomalySeverityType: ANOMALY_SEVERITY.MINOR };
await alertExecutor!({ services, params });
await executor({ params });
await alertExecutor!({ services, params });
[
'apm.transaction_duration_anomaly_foo_production_type-foo',
'apm.transaction_duration_anomaly_bar_production_type-bar',
].forEach((instanceName) =>
expect(services.alertInstanceFactory).toHaveBeenCalledWith(instanceName)
expect(services.alertInstanceFactory).toHaveBeenCalledTimes(1);
expect(services.alertInstanceFactory).toHaveBeenCalledWith(
'apm.transaction_duration_anomaly_foo_development_type-foo'
);
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'foo',
transactionType: 'type-foo',
environment: 'production',
environment: 'development',
threshold: 'minor',
thresholdValue: 'critical',
});
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'bar',
transactionType: 'type-bar',
environment: 'production',
threshold: 'minor',
thresholdValue: 'warning',
});
});
it('with service name', async () => {
jest.spyOn(GetServiceAnomalies, 'getMLJobs').mockReturnValue(
Promise.resolve([
{
job_id: '1',
custom_settings: {
job_tags: {
environment: 'production',
},
},
} as unknown,
{
job_id: '2',
custom_settings: {
job_tags: {
environment: 'testing',
},
},
} as unknown,
] as Job[])
);
let alertExecutor: any;
const alerting = {
registerType: ({ executor }) => {
alertExecutor = executor;
},
} as AlertingPlugin['setup'];
const ml = ({
mlSystemProvider: () => ({
mlAnomalySearch: () => ({
hits: { total: { value: 2 } },
aggregations: {
services: {
buckets: [
{ key: 'foo', record_avg: { value: 80 } },
{ key: 'bar', record_avg: { value: 20 } },
],
},
},
}),
}),
anomalyDetectorsProvider: jest.fn(),
} as unknown) as MlPluginSetup;
registerTransactionDurationAnomalyAlertType({
alerting,
ml,
config$: mockedConfig$,
});
expect(alertExecutor).toBeDefined();
const scheduleActions = jest.fn();
const services = {
callCluster: jest.fn(),
alertInstanceFactory: jest.fn(() => ({ scheduleActions })),
};
const params = { anomalySeverityType: ANOMALY_SEVERITY.MINOR };
await alertExecutor!({ services, params });
await alertExecutor!({ services, params });
[
'apm.transaction_duration_anomaly_foo_production',
'apm.transaction_duration_anomaly_foo_testing',
'apm.transaction_duration_anomaly_bar_production',
'apm.transaction_duration_anomaly_bar_testing',
].forEach((instanceName) =>
expect(services.alertInstanceFactory).toHaveBeenCalledWith(instanceName)
);
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'foo',
transactionType: undefined,
environment: 'production',
threshold: 'minor',
thresholdValue: 'critical',
});
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'bar',
transactionType: undefined,
environment: 'production',
threshold: 'minor',
thresholdValue: 'warning',
});
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'foo',
transactionType: undefined,
environment: 'testing',
threshold: 'minor',
thresholdValue: 'critical',
});
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'bar',
transactionType: undefined,
environment: 'testing',
threshold: 'minor',
thresholdValue: 'warning',
triggerValue: 'critical',
});
});
});

View file

@ -6,9 +6,16 @@
*/
import { schema } from '@kbn/config-schema';
import { Observable } from 'rxjs';
import { isEmpty } from 'lodash';
import { compact } from 'lodash';
import { ESSearchResponse } from 'typings/elasticsearch';
import { QueryContainer } from '@elastic/elasticsearch/api/types';
import { getSeverity } from '../../../common/anomaly_detection';
import {
SERVICE_ENVIRONMENT,
SERVICE_NAME,
TRANSACTION_TYPE,
} from '../../../common/elasticsearch_fieldnames';
import { asMutableArray } from '../../../common/utils/as_mutable_array';
import { ANOMALY_SEVERITY } from '../../../../ml/common';
import { KibanaRequest } from '../../../../../../src/core/server';
import {
@ -16,17 +23,11 @@ import {
ALERT_TYPES_CONFIG,
ANOMALY_ALERT_SEVERITY_TYPES,
} from '../../../common/alert_types';
import { AlertingPlugin } from '../../../../alerting/server';
import { APMConfig } from '../..';
import { MlPluginSetup } from '../../../../ml/server';
import { getMLJobs } from '../service_map/get_service_anomalies';
import { apmActionVariables } from './action_variables';
interface RegisterAlertParams {
alerting: AlertingPlugin['setup'];
ml?: MlPluginSetup;
config$: Observable<APMConfig>;
}
import { RegisterRuleDependencies } from './register_apm_alerts';
import { parseEnvironmentUrlParam } from '../../../common/environment_filter_values';
import { createAPMLifecycleRuleType } from './create_apm_lifecycle_rule_type';
const paramsSchema = schema.object({
serviceName: schema.maybe(schema.string()),
@ -46,203 +47,199 @@ const alertTypeConfig =
ALERT_TYPES_CONFIG[AlertType.TransactionDurationAnomaly];
export function registerTransactionDurationAnomalyAlertType({
alerting,
registry,
ml,
config$,
}: RegisterAlertParams) {
alerting.registerType({
id: AlertType.TransactionDurationAnomaly,
name: alertTypeConfig.name,
actionGroups: alertTypeConfig.actionGroups,
defaultActionGroupId: alertTypeConfig.defaultActionGroupId,
validate: {
params: paramsSchema,
},
actionVariables: {
context: [
apmActionVariables.serviceName,
apmActionVariables.transactionType,
apmActionVariables.environment,
apmActionVariables.threshold,
apmActionVariables.triggerValue,
],
},
producer: 'apm',
minimumLicenseRequired: 'basic',
executor: async ({ services, params, state }) => {
if (!ml) {
return;
}
const alertParams = params;
const request = {} as KibanaRequest;
const { mlAnomalySearch } = ml.mlSystemProvider(
request,
services.savedObjectsClient
);
const anomalyDetectors = ml.anomalyDetectorsProvider(
request,
services.savedObjectsClient
);
const mlJobs = await getMLJobs(anomalyDetectors, alertParams.environment);
const selectedOption = ANOMALY_ALERT_SEVERITY_TYPES.find(
(option) => option.type === alertParams.anomalySeverityType
);
if (!selectedOption) {
throw new Error(
`Anomaly alert severity type ${alertParams.anomalySeverityType} is not supported.`
);
}
const threshold = selectedOption.threshold;
if (mlJobs.length === 0) {
return {};
}
const jobIds = mlJobs.map((job) => job.job_id);
const anomalySearchParams = {
terminateAfter: 1,
body: {
size: 0,
query: {
bool: {
filter: [
{ term: { result_type: 'record' } },
{ terms: { job_id: jobIds } },
{
range: {
timestamp: {
gte: `now-${alertParams.windowSize}${alertParams.windowUnit}`,
format: 'epoch_millis',
},
},
},
...(alertParams.serviceName
? [
{
term: {
partition_field_value: alertParams.serviceName,
},
},
]
: []),
...(alertParams.transactionType
? [
{
term: {
by_field_value: alertParams.transactionType,
},
},
]
: []),
{
range: {
record_score: {
gte: threshold,
},
},
},
],
},
},
aggs: {
services: {
terms: {
field: 'partition_field_value',
size: 50,
},
aggs: {
transaction_types: {
terms: {
field: 'by_field_value',
},
},
record_avg: {
avg: {
field: 'record_score',
},
},
},
},
},
},
};
const response = ((await mlAnomalySearch(
anomalySearchParams,
jobIds
)) as unknown) as {
hits: { total: { value: number } };
aggregations?: {
services: {
buckets: Array<{
key: string;
record_avg: { value: number };
transaction_types: { buckets: Array<{ key: string }> };
}>;
};
};
};
const hitCount = response.hits.total.value;
if (hitCount > 0) {
function scheduleAction({
serviceName,
severity,
environment,
transactionType,
}: {
serviceName: string;
severity: string;
environment?: string;
transactionType?: string;
}) {
const alertInstanceName = [
AlertType.TransactionDurationAnomaly,
serviceName,
environment,
transactionType,
]
.filter((name) => name)
.join('_');
const alertInstance = services.alertInstanceFactory(
alertInstanceName
);
alertInstance.scheduleActions(alertTypeConfig.defaultActionGroupId, {
serviceName,
environment,
transactionType,
threshold: selectedOption?.label,
thresholdValue: severity,
});
logger,
}: RegisterRuleDependencies) {
registry.registerType(
createAPMLifecycleRuleType({
id: AlertType.TransactionDurationAnomaly,
name: alertTypeConfig.name,
actionGroups: alertTypeConfig.actionGroups,
defaultActionGroupId: alertTypeConfig.defaultActionGroupId,
validate: {
params: paramsSchema,
},
actionVariables: {
context: [
apmActionVariables.serviceName,
apmActionVariables.transactionType,
apmActionVariables.environment,
apmActionVariables.threshold,
apmActionVariables.triggerValue,
],
},
producer: 'apm',
minimumLicenseRequired: 'basic',
executor: async ({ services, params }) => {
if (!ml) {
return {};
}
mlJobs.map((job) => {
const environment = job.custom_settings?.job_tags?.environment;
response.aggregations?.services.buckets.forEach((serviceBucket) => {
const serviceName = serviceBucket.key as string;
const severity = getSeverity(serviceBucket.record_avg.value);
if (isEmpty(serviceBucket.transaction_types?.buckets)) {
scheduleAction({ serviceName, severity, environment });
} else {
serviceBucket.transaction_types?.buckets.forEach((typeBucket) => {
const transactionType = typeBucket.key as string;
scheduleAction({
serviceName,
severity,
environment,
transactionType,
});
});
}
});
const alertParams = params;
const request = {} as KibanaRequest;
const { mlAnomalySearch } = ml.mlSystemProvider(
request,
services.savedObjectsClient
);
const anomalyDetectors = ml.anomalyDetectorsProvider(
request,
services.savedObjectsClient
);
const mlJobs = await getMLJobs(
anomalyDetectors,
alertParams.environment
);
const selectedOption = ANOMALY_ALERT_SEVERITY_TYPES.find(
(option) => option.type === alertParams.anomalySeverityType
);
if (!selectedOption) {
throw new Error(
`Anomaly alert severity type ${alertParams.anomalySeverityType} is not supported.`
);
}
const threshold = selectedOption.threshold;
if (mlJobs.length === 0) {
return {};
}
const jobIds = mlJobs.map((job) => job.job_id);
const anomalySearchParams = {
body: {
size: 0,
query: {
bool: {
filter: [
{ term: { result_type: 'record' } },
{ terms: { job_id: jobIds } },
{ term: { is_interim: false } },
{
range: {
timestamp: {
gte: `now-${alertParams.windowSize}${alertParams.windowUnit}`,
format: 'epoch_millis',
},
},
},
...(alertParams.serviceName
? [
{
term: {
partition_field_value: alertParams.serviceName,
},
},
]
: []),
...(alertParams.transactionType
? [
{
term: {
by_field_value: alertParams.transactionType,
},
},
]
: []),
] as QueryContainer[],
},
},
aggs: {
anomaly_groups: {
multi_terms: {
terms: [
{ field: 'partition_field_value' },
{ field: 'by_field_value' },
{ field: 'job_id' },
],
size: 10000,
},
aggs: {
latest_score: {
top_metrics: {
metrics: asMutableArray([
{ field: 'record_score' },
{ field: 'partition_field_value' },
{ field: 'by_field_value' },
{ field: 'job_id' },
] as const),
sort: {
'@timestamp': 'desc' as const,
},
},
},
},
},
},
},
};
const response: ESSearchResponse<
unknown,
typeof anomalySearchParams
> = (await mlAnomalySearch(anomalySearchParams, [])) as any;
const anomalies =
response.aggregations?.anomaly_groups.buckets
.map((bucket) => {
const latest = bucket.latest_score.top[0].metrics;
const job = mlJobs.find((j) => j.job_id === latest.job_id);
if (!job) {
logger.warn(
`Could not find matching job for job id ${latest.job_id}`
);
return undefined;
}
return {
serviceName: latest.partition_field_value as string,
transactionType: latest.by_field_value as string,
environment: job.custom_settings!.job_tags!.environment,
score: latest.record_score as number,
};
})
.filter((anomaly) =>
anomaly ? anomaly.score >= threshold : false
) ?? [];
compact(anomalies).forEach((anomaly) => {
const { serviceName, environment, transactionType, score } = anomaly;
const parsedEnvironment = parseEnvironmentUrlParam(environment);
services
.alertWithLifecycle({
id: [
AlertType.TransactionDurationAnomaly,
serviceName,
environment,
transactionType,
]
.filter((name) => name)
.join('_'),
fields: {
[SERVICE_NAME]: serviceName,
...(parsedEnvironment.esFieldValue
? { [SERVICE_ENVIRONMENT]: environment }
: {}),
[TRANSACTION_TYPE]: transactionType,
},
})
.scheduleActions(alertTypeConfig.defaultActionGroupId, {
serviceName,
transactionType,
environment,
threshold: selectedOption?.label,
triggerValue: getSeverity(score),
});
});
}
},
});
return {};
},
})
);
}

View file

@ -5,48 +5,19 @@
* 2.0.
*/
import { Observable } from 'rxjs';
import * as Rx from 'rxjs';
import { toArray, map } from 'rxjs/operators';
import { AlertingPlugin } from '../../../../alerting/server';
import { APMConfig } from '../..';
import { registerTransactionErrorRateAlertType } from './register_transaction_error_rate_alert_type';
import { elasticsearchServiceMock } from 'src/core/server/mocks';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { elasticsearchClientMock } from 'src/core/server/elasticsearch/client/mocks';
type Operator<T1, T2> = (source: Rx.Observable<T1>) => Rx.Observable<T2>;
const pipeClosure = <T1, T2>(fn: Operator<T1, T2>): Operator<T1, T2> => {
return (source: Rx.Observable<T1>) => {
return Rx.defer(() => fn(source));
};
};
const mockedConfig$ = (Rx.of('apm_oss.errorIndices').pipe(
pipeClosure((source$) => {
return source$.pipe(map((i) => i));
}),
toArray()
) as unknown) as Observable<APMConfig>;
import { createRuleTypeMocks } from './test_utils';
describe('Transaction error rate alert', () => {
it("doesn't send an alert when rate is less than threshold", async () => {
let alertExecutor: any;
const alerting = {
registerType: ({ executor }) => {
alertExecutor = executor;
},
} as AlertingPlugin['setup'];
const { services, dependencies, executor } = createRuleTypeMocks();
registerTransactionErrorRateAlertType({
alerting,
config$: mockedConfig$,
...dependencies,
});
expect(alertExecutor).toBeDefined();
const services = {
scopedClusterClient: elasticsearchServiceMock.createScopedClusterClient(),
alertInstanceFactory: jest.fn(),
};
const params = { threshold: 1 };
services.scopedClusterClient.asCurrentUser.search.mockReturnValue(
@ -60,6 +31,11 @@ describe('Transaction error rate alert', () => {
},
took: 0,
timed_out: false,
aggregations: {
series: {
buckets: [],
},
},
_shards: {
failed: 0,
skipped: 0,
@ -69,30 +45,21 @@ describe('Transaction error rate alert', () => {
})
);
await alertExecutor!({ services, params });
await executor({ params });
expect(services.alertInstanceFactory).not.toBeCalled();
});
it('sends alerts with service name, transaction type and environment', async () => {
let alertExecutor: any;
const alerting = {
registerType: ({ executor }) => {
alertExecutor = executor;
},
} as AlertingPlugin['setup'];
it('sends alerts for services that exceeded the threshold', async () => {
const {
services,
dependencies,
executor,
scheduleActions,
} = createRuleTypeMocks();
registerTransactionErrorRateAlertType({
alerting,
config$: mockedConfig$,
...dependencies,
});
expect(alertExecutor).toBeDefined();
const scheduleActions = jest.fn();
const services = {
scopedClusterClient: elasticsearchServiceMock.createScopedClusterClient(),
alertInstanceFactory: jest.fn(() => ({ scheduleActions })),
};
const params = { threshold: 10, windowSize: 5, windowUnit: 'm' };
services.scopedClusterClient.asCurrentUser.search.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
@ -100,37 +67,38 @@ describe('Transaction error rate alert', () => {
hits: [],
total: {
relation: 'eq',
value: 4,
value: 0,
},
},
aggregations: {
failed_transactions: {
doc_count: 2,
},
services: {
series: {
buckets: [
{
key: 'foo',
transaction_types: {
key: ['foo', 'env-foo', 'type-foo'],
outcomes: {
buckets: [
{
key: 'type-foo',
environments: {
buckets: [{ key: 'env-foo' }, { key: 'env-foo-2' }],
},
key: 'success',
doc_count: 90,
},
{
key: 'failure',
doc_count: 10,
},
],
},
},
{
key: 'bar',
transaction_types: {
key: ['bar', 'env-bar', 'type-bar'],
outcomes: {
buckets: [
{
key: 'type-bar',
environments: {
buckets: [{ key: 'env-bar' }, { key: 'env-bar-2' }],
},
key: 'success',
doc_count: 90,
},
{
key: 'failure',
doc_count: 1,
},
],
},
@ -149,14 +117,17 @@ describe('Transaction error rate alert', () => {
})
);
await alertExecutor!({ services, params });
[
'apm.transaction_error_rate_foo_type-foo_env-foo',
'apm.transaction_error_rate_foo_type-foo_env-foo-2',
'apm.transaction_error_rate_bar_type-bar_env-bar',
'apm.transaction_error_rate_bar_type-bar_env-bar-2',
].forEach((instanceName) =>
expect(services.alertInstanceFactory).toHaveBeenCalledWith(instanceName)
const params = { threshold: 10, windowSize: 5, windowUnit: 'm' };
await executor({ params });
expect(services.alertInstanceFactory).toHaveBeenCalledTimes(1);
expect(services.alertInstanceFactory).toHaveBeenCalledWith(
'apm.transaction_error_rate_foo_type-foo_env-foo'
);
expect(services.alertInstanceFactory).not.toHaveBeenCalledWith(
'apm.transaction_error_rate_bar_type-bar_env-bar'
);
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
@ -164,193 +135,7 @@ describe('Transaction error rate alert', () => {
transactionType: 'type-foo',
environment: 'env-foo',
threshold: 10,
triggerValue: '50',
interval: '5m',
});
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'foo',
transactionType: 'type-foo',
environment: 'env-foo-2',
threshold: 10,
triggerValue: '50',
interval: '5m',
});
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'bar',
transactionType: 'type-bar',
environment: 'env-bar',
threshold: 10,
triggerValue: '50',
interval: '5m',
});
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'bar',
transactionType: 'type-bar',
environment: 'env-bar-2',
threshold: 10,
triggerValue: '50',
interval: '5m',
});
});
it('sends alerts with service name and transaction type', async () => {
let alertExecutor: any;
const alerting = {
registerType: ({ executor }) => {
alertExecutor = executor;
},
} as AlertingPlugin['setup'];
registerTransactionErrorRateAlertType({
alerting,
config$: mockedConfig$,
});
expect(alertExecutor).toBeDefined();
const scheduleActions = jest.fn();
const services = {
scopedClusterClient: elasticsearchServiceMock.createScopedClusterClient(),
alertInstanceFactory: jest.fn(() => ({ scheduleActions })),
};
const params = { threshold: 10, windowSize: 5, windowUnit: 'm' };
services.scopedClusterClient.asCurrentUser.search.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
hits: {
hits: [],
total: {
relation: 'eq',
value: 4,
},
},
aggregations: {
failed_transactions: {
doc_count: 2,
},
services: {
buckets: [
{
key: 'foo',
transaction_types: {
buckets: [{ key: 'type-foo' }],
},
},
{
key: 'bar',
transaction_types: {
buckets: [{ key: 'type-bar' }],
},
},
],
},
},
took: 0,
timed_out: false,
_shards: {
failed: 0,
skipped: 0,
successful: 1,
total: 1,
},
})
);
await alertExecutor!({ services, params });
[
'apm.transaction_error_rate_foo_type-foo',
'apm.transaction_error_rate_bar_type-bar',
].forEach((instanceName) =>
expect(services.alertInstanceFactory).toHaveBeenCalledWith(instanceName)
);
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'foo',
transactionType: 'type-foo',
environment: undefined,
threshold: 10,
triggerValue: '50',
interval: '5m',
});
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'bar',
transactionType: 'type-bar',
environment: undefined,
threshold: 10,
triggerValue: '50',
interval: '5m',
});
});
it('sends alerts with service name', async () => {
let alertExecutor: any;
const alerting = {
registerType: ({ executor }) => {
alertExecutor = executor;
},
} as AlertingPlugin['setup'];
registerTransactionErrorRateAlertType({
alerting,
config$: mockedConfig$,
});
expect(alertExecutor).toBeDefined();
const scheduleActions = jest.fn();
const services = {
scopedClusterClient: elasticsearchServiceMock.createScopedClusterClient(),
alertInstanceFactory: jest.fn(() => ({ scheduleActions })),
};
const params = { threshold: 10, windowSize: 5, windowUnit: 'm' };
services.scopedClusterClient.asCurrentUser.search.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
hits: {
hits: [],
total: {
value: 4,
relation: 'eq',
},
},
aggregations: {
failed_transactions: {
doc_count: 2,
},
services: {
buckets: [{ key: 'foo' }, { key: 'bar' }],
},
},
took: 0,
timed_out: false,
_shards: {
failed: 0,
skipped: 0,
successful: 1,
total: 1,
},
})
);
await alertExecutor!({ services, params });
[
'apm.transaction_error_rate_foo',
'apm.transaction_error_rate_bar',
].forEach((instanceName) =>
expect(services.alertInstanceFactory).toHaveBeenCalledWith(instanceName)
);
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'foo',
transactionType: undefined,
environment: undefined,
threshold: 10,
triggerValue: '50',
interval: '5m',
});
expect(scheduleActions).toHaveBeenCalledWith('threshold_met', {
serviceName: 'bar',
transactionType: undefined,
environment: undefined,
threshold: 10,
triggerValue: '50',
triggerValue: '10',
interval: '5m',
});
});

View file

@ -6,11 +6,7 @@
*/
import { schema } from '@kbn/config-schema';
import { isEmpty } from 'lodash';
import { Observable } from 'rxjs';
import { take } from 'rxjs/operators';
import { APMConfig } from '../..';
import { AlertingPlugin } from '../../../../alerting/server';
import { AlertType, ALERT_TYPES_CONFIG } from '../../../common/alert_types';
import {
EVENT_OUTCOME,
@ -26,11 +22,8 @@ import { environmentQuery } from '../../../server/utils/queries';
import { getApmIndices } from '../settings/apm_indices/get_apm_indices';
import { apmActionVariables } from './action_variables';
import { alertingEsClient } from './alerting_es_client';
interface RegisterAlertParams {
alerting: AlertingPlugin['setup'];
config$: Observable<APMConfig>;
}
import { createAPMLifecycleRuleType } from './create_apm_lifecycle_rule_type';
import { RegisterRuleDependencies } from './register_apm_alerts';
const paramsSchema = schema.object({
windowSize: schema.number(),
@ -44,158 +37,165 @@ const paramsSchema = schema.object({
const alertTypeConfig = ALERT_TYPES_CONFIG[AlertType.TransactionErrorRate];
export function registerTransactionErrorRateAlertType({
alerting,
registry,
config$,
}: RegisterAlertParams) {
alerting.registerType({
id: AlertType.TransactionErrorRate,
name: alertTypeConfig.name,
actionGroups: alertTypeConfig.actionGroups,
defaultActionGroupId: alertTypeConfig.defaultActionGroupId,
validate: {
params: paramsSchema,
},
actionVariables: {
context: [
apmActionVariables.transactionType,
apmActionVariables.serviceName,
apmActionVariables.environment,
apmActionVariables.threshold,
apmActionVariables.triggerValue,
apmActionVariables.interval,
],
},
producer: 'apm',
minimumLicenseRequired: 'basic',
executor: async ({ services, params: alertParams }) => {
const config = await config$.pipe(take(1)).toPromise();
const indices = await getApmIndices({
config,
savedObjectsClient: services.savedObjectsClient,
});
const maxServiceEnvironments = config['xpack.apm.maxServiceEnvironments'];
}: RegisterRuleDependencies) {
registry.registerType(
createAPMLifecycleRuleType({
id: AlertType.TransactionErrorRate,
name: alertTypeConfig.name,
actionGroups: alertTypeConfig.actionGroups,
defaultActionGroupId: alertTypeConfig.defaultActionGroupId,
validate: {
params: paramsSchema,
},
actionVariables: {
context: [
apmActionVariables.transactionType,
apmActionVariables.serviceName,
apmActionVariables.environment,
apmActionVariables.threshold,
apmActionVariables.triggerValue,
apmActionVariables.interval,
],
},
producer: 'apm',
minimumLicenseRequired: 'basic',
executor: async ({ services, params: alertParams }) => {
const config = await config$.pipe(take(1)).toPromise();
const indices = await getApmIndices({
config,
savedObjectsClient: services.savedObjectsClient,
});
const searchParams = {
index: indices['apm_oss.transactionIndices'],
size: 0,
body: {
track_total_hits: true,
query: {
bool: {
filter: [
{
range: {
'@timestamp': {
gte: `now-${alertParams.windowSize}${alertParams.windowUnit}`,
const searchParams = {
index: indices['apm_oss.transactionIndices'],
size: 1,
body: {
query: {
bool: {
filter: [
{
range: {
'@timestamp': {
gte: `now-${alertParams.windowSize}${alertParams.windowUnit}`,
},
},
},
},
{ term: { [PROCESSOR_EVENT]: ProcessorEvent.transaction } },
...(alertParams.serviceName
? [{ term: { [SERVICE_NAME]: alertParams.serviceName } }]
: []),
...(alertParams.transactionType
? [
{
term: {
[TRANSACTION_TYPE]: alertParams.transactionType,
{ term: { [PROCESSOR_EVENT]: ProcessorEvent.transaction } },
{
terms: {
[EVENT_OUTCOME]: [
EventOutcome.failure,
EventOutcome.success,
],
},
},
...(alertParams.serviceName
? [{ term: { [SERVICE_NAME]: alertParams.serviceName } }]
: []),
...(alertParams.transactionType
? [
{
term: {
[TRANSACTION_TYPE]: alertParams.transactionType,
},
},
},
]
: []),
...environmentQuery(alertParams.environment),
],
},
},
aggs: {
failed_transactions: {
filter: { term: { [EVENT_OUTCOME]: EventOutcome.failure } },
},
services: {
terms: {
field: SERVICE_NAME,
size: 50,
]
: []),
...environmentQuery(alertParams.environment),
],
},
aggs: {
transaction_types: {
terms: { field: TRANSACTION_TYPE },
aggs: {
environments: {
terms: {
field: SERVICE_ENVIRONMENT,
size: maxServiceEnvironments,
},
},
aggs: {
series: {
multi_terms: {
terms: [
{ field: SERVICE_NAME },
{ field: SERVICE_ENVIRONMENT, missing: '' },
{ field: TRANSACTION_TYPE },
],
size: 10000,
},
aggs: {
outcomes: {
terms: {
field: EVENT_OUTCOME,
},
},
},
},
},
},
},
};
};
const { body: response } = await alertingEsClient(services, searchParams);
if (!response.aggregations) {
return;
}
const response = await alertingEsClient(
services.scopedClusterClient,
searchParams
);
const failedTransactionCount =
response.aggregations.failed_transactions.doc_count;
const totalTransactionCount = response.hits.total.value;
const transactionErrorRate =
(failedTransactionCount / totalTransactionCount) * 100;
if (transactionErrorRate > alertParams.threshold) {
function scheduleAction({
serviceName,
environment,
transactionType,
}: {
serviceName: string;
environment?: string;
transactionType?: string;
}) {
const alertInstanceName = [
AlertType.TransactionErrorRate,
serviceName,
transactionType,
environment,
]
.filter((name) => name)
.join('_');
const alertInstance = services.alertInstanceFactory(
alertInstanceName
);
alertInstance.scheduleActions(alertTypeConfig.defaultActionGroupId, {
serviceName,
transactionType,
environment,
threshold: alertParams.threshold,
triggerValue: asDecimalOrInteger(transactionErrorRate),
interval: `${alertParams.windowSize}${alertParams.windowUnit}`,
});
if (!response.aggregations) {
return {};
}
response.aggregations?.services.buckets.forEach((serviceBucket) => {
const serviceName = serviceBucket.key as string;
if (isEmpty(serviceBucket.transaction_types?.buckets)) {
scheduleAction({ serviceName });
} else {
serviceBucket.transaction_types.buckets.forEach((typeBucket) => {
const transactionType = typeBucket.key as string;
if (isEmpty(typeBucket.environments?.buckets)) {
scheduleAction({ serviceName, transactionType });
} else {
typeBucket.environments.buckets.forEach((envBucket) => {
const environment = envBucket.key as string;
scheduleAction({ serviceName, transactionType, environment });
});
}
const results = response.aggregations.series.buckets
.map((bucket) => {
const [serviceName, environment, transactionType] = bucket.key;
const failed =
bucket.outcomes.buckets.find(
(outcomeBucket) => outcomeBucket.key === EventOutcome.failure
)?.doc_count ?? 0;
const succesful =
bucket.outcomes.buckets.find(
(outcomeBucket) => outcomeBucket.key === EventOutcome.success
)?.doc_count ?? 0;
return {
serviceName,
environment,
transactionType,
errorRate: (failed / (failed + succesful)) * 100,
};
})
.filter((result) => result.errorRate >= alertParams.threshold);
results.forEach((result) => {
const {
serviceName,
environment,
transactionType,
errorRate,
} = result;
services
.alertWithLifecycle({
id: [
AlertType.TransactionErrorRate,
serviceName,
transactionType,
environment,
]
.filter((name) => name)
.join('_'),
fields: {
[SERVICE_NAME]: serviceName,
...(environment ? { [SERVICE_ENVIRONMENT]: environment } : {}),
[TRANSACTION_TYPE]: transactionType,
},
})
.scheduleActions(alertTypeConfig.defaultActionGroupId, {
serviceName,
transactionType,
environment,
threshold: alertParams.threshold,
triggerValue: asDecimalOrInteger(errorRate),
interval: `${alertParams.windowSize}${alertParams.windowUnit}`,
});
}
});
}
},
});
return {};
},
})
);
}

View file

@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Logger } from 'kibana/server';
import { of } from 'rxjs';
import { elasticsearchServiceMock } from 'src/core/server/mocks';
import { APMConfig } from '../../..';
import { APMRuleRegistry } from '../../../plugin';
export const createRuleTypeMocks = () => {
let alertExecutor: (...args: any[]) => Promise<any>;
const mockedConfig$ = of({
/* eslint-disable @typescript-eslint/naming-convention */
'apm_oss.errorIndices': 'apm-*',
'apm_oss.transactionIndices': 'apm-*',
/* eslint-enable @typescript-eslint/naming-convention */
} as APMConfig);
const loggerMock = ({
debug: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
} as unknown) as Logger;
const registry = {
registerType: ({ executor }) => {
alertExecutor = executor;
},
} as APMRuleRegistry;
const scheduleActions = jest.fn();
const services = {
scopedClusterClient: elasticsearchServiceMock.createScopedClusterClient(),
scopedRuleRegistryClient: {
bulkIndex: jest.fn(),
},
alertInstanceFactory: jest.fn(() => ({ scheduleActions })),
alertWithLifecycle: jest.fn(),
logger: loggerMock,
};
return {
dependencies: {
registry,
config$: mockedConfig$,
logger: loggerMock,
},
services,
scheduleActions,
executor: async ({ params }: { params: Record<string, any> }) => {
return alertExecutor({
services,
params,
startedAt: new Date(),
});
},
};
};

View file

@ -43,6 +43,8 @@ import {
import { registerRoutes } from './routes/register_routes';
import { getGlobalApmServerRouteRepository } from './routes/get_global_apm_server_route_repository';
export type APMRuleRegistry = ReturnType<APMPlugin['setup']>['ruleRegistry'];
export class APMPlugin
implements
Plugin<
@ -72,15 +74,6 @@ export class APMPlugin
core.uiSettings.register(uiSettings);
if (plugins.actions && plugins.alerting) {
registerApmAlerts({
alerting: plugins.alerting,
actions: plugins.actions,
ml: plugins.ml,
config$: mergedConfig$,
});
}
const currentConfig = mergeConfigs(
plugins.apmOss.config,
this.initContext.config.get<APMXPackConfig>()
@ -157,6 +150,28 @@ export class APMPlugin
config: await mergedConfig$.pipe(take(1)).toPromise(),
});
const apmRuleRegistry = plugins.observability.ruleRegistry.create({
name: 'apm',
fieldMap: {
'service.environment': {
type: 'keyword',
},
'transaction.type': {
type: 'keyword',
},
'processor.event': {
type: 'keyword',
},
},
});
registerApmAlerts({
registry: apmRuleRegistry,
ml: plugins.ml,
config$: mergedConfig$,
logger: this.logger!.get('rule'),
});
return {
config$: mergedConfig$,
getApmIndices: boundGetApmIndices,
@ -186,6 +201,7 @@ export class APMPlugin
},
});
},
ruleRegistry: apmRuleRegistry,
};
}

View file

@ -60,21 +60,3 @@ export interface APMRouteHandlerResources {
};
};
}
// export type Client<
// TRouteState,
// TOptions extends { abortable: boolean } = { abortable: true }
// > = <TEndpoint extends keyof TRouteState & string>(
// options: Omit<
// FetchOptions,
// 'query' | 'body' | 'pathname' | 'method' | 'signal'
// > & {
// forceCache?: boolean;
// endpoint: TEndpoint;
// } & MaybeParams<TRouteState, TEndpoint> &
// (TOptions extends { abortable: true } ? { signal: AbortSignal | null } : {})
// ) => Promise<
// TRouteState[TEndpoint] extends { ret: any }
// ? TRouteState[TEndpoint]['ret']
// : unknown
// >;

View file

@ -125,6 +125,7 @@ const requiredDependencies = [
'triggersActionsUi',
'embeddable',
'infra',
'observability',
] as const;
const optionalDependencies = [
@ -134,7 +135,6 @@ const optionalDependencies = [
'taskManager',
'actions',
'alerting',
'observability',
'security',
'ml',
'home',

View file

@ -38,6 +38,7 @@
{ "path": "../ml/tsconfig.json" },
{ "path": "../observability/tsconfig.json" },
{ "path": "../reporting/tsconfig.json" },
{ "path": "../rule_registry/tsconfig.json" },
{ "path": "../security/tsconfig.json" },
{ "path": "../task_manager/tsconfig.json" },
{ "path": "../triggers_actions_ui/tsconfig.json" }

View file

@ -12,7 +12,6 @@ import {
IClusterClientAdapter,
EVENT_BUFFER_LENGTH,
} from './cluster_client_adapter';
import { contextMock } from './context.mock';
import { findOptionsSchema } from '../event_log_client';
import { delay } from '../lib/delay';
import { times } from 'lodash';
@ -31,7 +30,7 @@ beforeEach(() => {
clusterClientAdapter = new ClusterClientAdapter({
logger,
elasticsearchClientPromise: Promise.resolve(clusterClient),
context: contextMock.create(),
wait: () => Promise.resolve(true),
});
});

View file

@ -10,8 +10,8 @@ import { bufferTime, filter as rxFilter, switchMap } from 'rxjs/operators';
import { reject, isUndefined, isNumber } from 'lodash';
import type { PublicMethodsOf } from '@kbn/utility-types';
import { Logger, ElasticsearchClient } from 'src/core/server';
import util from 'util';
import { estypes } from '@elastic/elasticsearch';
import { EsContext } from '.';
import { IEvent, IValidatedEvent, SAVED_OBJECT_REL_PRIMARY } from '../types';
import { FindOptionsType } from '../event_log_client';
import { esKuery } from '../../../../../src/plugins/data/server';
@ -26,10 +26,12 @@ export interface Doc {
body: IEvent;
}
type Wait = () => Promise<boolean>;
export interface ConstructorOpts {
logger: Logger;
elasticsearchClientPromise: Promise<ElasticsearchClient>;
context: EsContext;
wait: Wait;
}
export interface QueryEventsBySavedObjectResult {
@ -39,18 +41,21 @@ export interface QueryEventsBySavedObjectResult {
data: IValidatedEvent[];
}
export class ClusterClientAdapter {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
type AliasAny = any;
export class ClusterClientAdapter<TDoc extends { body: AliasAny; index: string } = Doc> {
private readonly logger: Logger;
private readonly elasticsearchClientPromise: Promise<ElasticsearchClient>;
private readonly docBuffer$: Subject<Doc>;
private readonly context: EsContext;
private readonly docBuffer$: Subject<TDoc>;
private readonly wait: Wait;
private readonly docsBufferedFlushed: Promise<void>;
constructor(opts: ConstructorOpts) {
this.logger = opts.logger;
this.elasticsearchClientPromise = opts.elasticsearchClientPromise;
this.context = opts.context;
this.docBuffer$ = new Subject<Doc>();
this.wait = opts.wait;
this.docBuffer$ = new Subject<TDoc>();
// buffer event log docs for time / buffer length, ignore empty
// buffers, then index the buffered docs; kick things off with a
@ -75,18 +80,21 @@ export class ClusterClientAdapter {
await this.docsBufferedFlushed;
}
public indexDocument(doc: Doc): void {
public indexDocument(doc: TDoc): void {
this.docBuffer$.next(doc);
}
async indexDocuments(docs: Doc[]): Promise<void> {
async indexDocuments(docs: TDoc[]): Promise<void> {
// If es initialization failed, don't try to index.
// Also, don't log here, we log the failure case in plugin startup
// instead, otherwise we'd be spamming the log (if done here)
if (!(await this.context.waitTillReady())) {
if (!(await this.wait())) {
this.logger.debug(`Initialization failed, not indexing ${docs.length} documents`);
return;
}
this.logger.debug(`Indexing ${docs.length} documents`);
const bulkBody: Array<Record<string, unknown>> = [];
for (const doc of docs) {
@ -98,7 +106,13 @@ export class ClusterClientAdapter {
try {
const esClient = await this.elasticsearchClientPromise;
await esClient.bulk({ body: bulkBody });
const response = await esClient.bulk({ body: bulkBody });
if (response.body.errors) {
const error = new Error('Error writing some bulk events');
error.stack += '\n' + util.inspect(response.body.items, { depth: null });
this.logger.error(error);
}
} catch (err) {
this.logger.error(
`error writing bulk events: "${err.message}"; docs: ${JSON.stringify(bulkBody)}`
@ -156,7 +170,9 @@ export class ClusterClientAdapter {
// instances at the same time.
const existsNow = await this.doesIndexTemplateExist(name);
if (!existsNow) {
throw new Error(`error creating index template: ${err.message}`);
const error = new Error(`error creating index template: ${err.message}`);
Object.assign(error, { wrapped: err });
throw error;
}
}
}

View file

@ -53,7 +53,7 @@ class EsContextImpl implements EsContext {
this.esAdapter = new ClusterClientAdapter({
logger: params.logger,
elasticsearchClientPromise: params.elasticsearchClientPromise,
context: this,
wait: () => this.readySignal.wait(),
});
}

View file

@ -20,5 +20,9 @@ export {
SAVED_OBJECT_REL_PRIMARY,
} from './types';
export { ClusterClientAdapter } from './es/cluster_client_adapter';
export { createReadySignal } from './lib/ready_signal';
export const config = { schema: ConfigSchema };
export const plugin = (context: PluginInitializerContext) => new Plugin(context);

View file

@ -3,7 +3,7 @@
"version": "8.0.0",
"kibanaVersion": "kibana",
"configPath": ["xpack", "observability"],
"optionalPlugins": ["licensing", "home", "usageCollection","lens"],
"optionalPlugins": ["licensing", "home", "usageCollection","lens", "ruleRegistry"],
"requiredPlugins": ["data"],
"ui": true,
"server": true,

View file

@ -6,29 +6,30 @@
*/
import { PluginInitializerContext, Plugin, CoreSetup } from 'src/core/server';
import { pickWithPatterns } from '../../rule_registry/server';
import { ObservabilityConfig } from '.';
import {
bootstrapAnnotations,
ScopedAnnotationsClient,
ScopedAnnotationsClientFactory,
AnnotationsAPI,
} from './lib/annotations/bootstrap_annotations';
import type { RuleRegistryPluginSetupContract } from '../../rule_registry/server';
import { uiSettings } from './ui_settings';
import { ecsFieldMap } from '../../rule_registry/server';
type LazyScopedAnnotationsClientFactory = (
...args: Parameters<ScopedAnnotationsClientFactory>
) => Promise<ScopedAnnotationsClient | undefined>;
export interface ObservabilityPluginSetup {
getScopedAnnotationsClient: LazyScopedAnnotationsClientFactory;
}
export type ObservabilityPluginSetup = ReturnType<ObservabilityPlugin['setup']>;
export class ObservabilityPlugin implements Plugin<ObservabilityPluginSetup> {
constructor(private readonly initContext: PluginInitializerContext) {
this.initContext = initContext;
}
public setup(core: CoreSetup, plugins: {}): ObservabilityPluginSetup {
public setup(
core: CoreSetup,
plugins: {
ruleRegistry: RuleRegistryPluginSetupContract;
}
) {
const config = this.initContext.config.get<ObservabilityConfig>();
let annotationsApiPromise: Promise<AnnotationsAPI> | undefined;
@ -48,10 +49,16 @@ export class ObservabilityPlugin implements Plugin<ObservabilityPluginSetup> {
}
return {
getScopedAnnotationsClient: async (...args) => {
getScopedAnnotationsClient: async (...args: Parameters<ScopedAnnotationsClientFactory>) => {
const api = await annotationsApiPromise;
return api?.getScopedAnnotationsClient(...args);
},
ruleRegistry: plugins.ruleRegistry.create({
name: 'observability',
fieldMap: {
...pickWithPatterns(ecsFieldMap, 'host.name', 'service.name'),
},
}),
};
}

View file

@ -23,6 +23,7 @@
{ "path": "../../../src/plugins/kibana_utils/tsconfig.json" },
{ "path": "../../../src/plugins/usage_collection/tsconfig.json" },
{ "path": "../alerting/tsconfig.json" },
{ "path": "../rule_registry/tsconfig.json" },
{ "path": "../licensing/tsconfig.json" },
{ "path": "../lens/tsconfig.json" },
{ "path": "../translations/tsconfig.json" }

View file

@ -0,0 +1,68 @@
The rule registry plugin aims to make it easy for rule type producers to have their rules produce the data that they need to build rich experiences on top of a unified experience, without the risk of mapping conflicts.
A rule registry creates a template, an ILM policy, and an alias. The template mappings can be configured. It also injects a client scoped to these indices.
It also supports inheritance, which means that producers can create a registry specific to their solution or rule type, and specify additional mappings to be used.
The rule registry plugin creates a root rule registry, with the mappings defined needed to create a unified experience. Rule type producers can use the plugin to access the root rule registry, and create their own registry that branches off of the root rule registry. The rule registry client sees data from its own registry, and all registries that branches off of it. It does not see data from its parents.
Creating a rule registry
To create a rule registry, producers should add the `ruleRegistry` plugin to their dependencies. They can then use the `ruleRegistry.create` method to create a child registry, with the additional mappings that should be used by specifying `fieldMap`:
```ts
const observabilityRegistry = plugins.ruleRegistry.create({
name: 'observability',
fieldMap: {
...pickWithPatterns(ecsFieldMap, 'host.name', 'service.name'),
},
})
```
`fieldMap` is a key-value map of field names and mapping options:
```ts
{
'@timestamp': {
type: 'date',
array: false,
required: true,
}
}
```
ECS mappings are generated via a script in the rule registry plugin directory. These mappings are available in x-pack/plugins/rule_registry/server/generated/ecs_field_map.ts.
To pick many fields, you can use `pickWithPatterns`, which supports wildcards with full type support.
If a registry is created, it will initialise as soon as the core services needed become available. It will create a (versioned) template, alias, and ILM policy, but only if these do not exist yet.
### Rule registry client
The rule registry client can either be injected in the executor, or created in the scope of a request. It exposes a `search` method and a `bulkIndex` method. When `search` is called, it first gets all the rules the current user has access to, and adds these ids to the search request that it executes. This means that the user can only see data from rules they have access to.
Both `search` and `bulkIndex` are fully typed, in the sense that they reflect the mappings defined for the registry.
### Schema
The following fields are available in the root rule registry:
- `@timestamp`: the ISO timestamp of the alert event. For the lifecycle rule type helper, it is always the value of `startedAt` that is injected by the Kibana alerting framework.
- `event.kind`: signal (for the changeable alert document), state (for the state changes of the alert, e.g. when it opens, recovers, or changes in severity), or metric (individual evaluations that might be related to an alert).
- `event.action`: the reason for the event. This might be `open`, `close`, `active`, or `evaluate`.
- `tags`: tags attached to the alert. Right now they are copied over from the rule.
- `rule.id`: the identifier of the rule type, e.g. `apm.transaction_duration`
- `rule.uuid`: the saved objects id of the rule.
- `rule.name`: the name of the rule (as specified by the user).
- `rule.category`: the name of the rule type (as defined by the rule type producer)
- `kibana.rac.producer`: the producer of the rule type. Usually a Kibana plugin. e.g., `APM`.
- `kibana.rac.alert.id`: the id of the alert, that is unique within the context of the rule execution it was created in. E.g., for a rule that monitors latency for all services in all environments, this might be `opbeans-java:production`.
- `kibana.rac.alert.uuid`: the unique identifier for the alert during its lifespan. If an alert recovers (or closes), this identifier is re-generated when it is opened again.
- `kibana.rac.alert.status`: the status of the alert. Can be `open` or `closed`.
- `kibana.rac.alert.start`: the ISO timestamp of the time at which the alert started.
- `kibana.rac.alert.end`: the ISO timestamp of the time at which the alert recovered.
- `kibana.rac.alert.duration.us`: the duration of the alert, in microseconds. This is always the difference between either the current time, or the time when the alert recovered.
- `kibana.rac.alert.severity.level`: the severity of the alert, as a keyword (e.g. critical).
- `kibana.rac.alert.severity.value`: the severity of the alert, as a numerical value, which allows sorting.
This list is not final - just a start. Field names might change or moved to a scoped registry. If we implement log and sequence based rule types the list of fields will grow. If a rule type needs additional fields, the recommendation would be to have the field in its own registry first (or in its producers registry), and if usage is more broadly adopted, it can be moved to the root registry.

View file

@ -0,0 +1,8 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export * from './types';

View file

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export enum AlertSeverityLevel {
warning = 'warning',
critical = 'critical',
}
const alertSeverityLevelValues = {
[AlertSeverityLevel.warning]: 70,
[AlertSeverityLevel.critical]: 90,
};
export function getAlertSeverityLevelValue(level: AlertSeverityLevel) {
return alertSeverityLevelValues[level];
}

View file

@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
module.exports = {
preset: '@kbn/test',
rootDir: '../../..',
roots: ['<rootDir>/x-pack/plugins/rule_registry'],
};

View file

@ -0,0 +1,13 @@
{
"id": "ruleRegistry",
"version": "8.0.0",
"kibanaVersion": "kibana",
"configPath": [
"xpack",
"ruleRegistry"
],
"requiredPlugins": [
"alerting"
],
"server": true
}

View file

@ -0,0 +1,81 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
const path = require('path');
const fs = require('fs');
const util = require('util');
const yaml = require('js-yaml');
const { exec: execCb } = require('child_process');
const { mapValues } = require('lodash');
const exists = util.promisify(fs.exists);
const readFile = util.promisify(fs.readFile);
const writeFile = util.promisify(fs.writeFile);
const mkdir = util.promisify(fs.mkdir);
const rmdir = util.promisify(fs.rmdir);
const exec = util.promisify(execCb);
const ecsDir = path.resolve(__dirname, '../../../../../../ecs');
const ecsTemplateFilename = path.join(ecsDir, 'generated/elasticsearch/7/template.json');
const flatYamlFilename = path.join(ecsDir, 'generated/ecs/ecs_flat.yml');
const outputDir = path.join(__dirname, '../../server/generated');
const outputFieldMapFilename = path.join(outputDir, 'ecs_field_map.ts');
const outputMappingFilename = path.join(outputDir, 'ecs_mappings.json');
async function generate() {
const allExists = await Promise.all([exists(ecsDir), exists(ecsTemplateFilename)]);
if (!allExists.every(Boolean)) {
throw new Error(
`Directory not found: ${ecsDir} - did you checkout elastic/ecs as a peer of this repo?`
);
}
const [template, flatYaml] = await Promise.all([
readFile(ecsTemplateFilename, { encoding: 'utf-8' }).then((str) => JSON.parse(str)),
(async () => yaml.safeLoad(await readFile(flatYamlFilename)))(),
]);
const mappings = {
properties: template.mappings.properties,
};
const fields = mapValues(flatYaml, (description) => {
return {
type: description.type,
array: description.normalize.includes('array'),
required: !!description.required,
};
});
const hasOutputDir = await exists(outputDir);
if (hasOutputDir) {
await rmdir(outputDir, { recursive: true });
}
await mkdir(outputDir);
await Promise.all([
writeFile(
outputFieldMapFilename,
`
export const ecsFieldMap = ${JSON.stringify(fields, null, 2)} as const
`,
{ encoding: 'utf-8' }
).then(() => {
return exec(`node scripts/eslint --fix ${outputFieldMapFilename}`);
}),
writeFile(outputMappingFilename, JSON.stringify(mappings, null, 2)),
]);
}
generate().catch((err) => {
console.log(err);
process.exit(1);
});

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { schema, TypeOf } from '@kbn/config-schema';
import { PluginInitializerContext } from 'src/core/server';
import { RuleRegistryPlugin } from './plugin';
export { RuleRegistryPluginSetupContract } from './plugin';
export { createLifecycleRuleTypeFactory } from './rule_registry/rule_type_helpers/create_lifecycle_rule_type_factory';
export { ecsFieldMap } from './generated/ecs_field_map';
export { pickWithPatterns } from './rule_registry/field_map/pick_with_patterns';
export { FieldMapOf } from './types';
export { ScopedRuleRegistryClient } from './rule_registry/create_scoped_rule_registry_client/types';
export const config = {
schema: schema.object({
enabled: schema.boolean({ defaultValue: true }),
writeEnabled: schema.boolean({ defaultValue: false }),
}),
};
export type RuleRegistryConfig = TypeOf<typeof config.schema>;
export const plugin = (initContext: PluginInitializerContext) =>
new RuleRegistryPlugin(initContext);

View file

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { PluginInitializerContext, Plugin, CoreSetup } from 'src/core/server';
import { PluginSetupContract as AlertingPluginSetupContract } from '../../alerting/server';
import { RuleRegistry } from './rule_registry';
import { defaultIlmPolicy } from './rule_registry/defaults/ilm_policy';
import { defaultFieldMap } from './rule_registry/defaults/field_map';
import { RuleRegistryConfig } from '.';
export type RuleRegistryPluginSetupContract = RuleRegistry<typeof defaultFieldMap>;
export class RuleRegistryPlugin implements Plugin<RuleRegistryPluginSetupContract> {
constructor(private readonly initContext: PluginInitializerContext) {
this.initContext = initContext;
}
public setup(
core: CoreSetup,
plugins: { alerting: AlertingPluginSetupContract }
): RuleRegistryPluginSetupContract {
const globalConfig = this.initContext.config.legacy.get();
const config = this.initContext.config.get<RuleRegistryConfig>();
const logger = this.initContext.logger.get();
const rootRegistry = new RuleRegistry({
coreSetup: core,
ilmPolicy: defaultIlmPolicy,
fieldMap: defaultFieldMap,
kibanaIndex: globalConfig.kibana.index,
name: 'alerts',
kibanaVersion: this.initContext.env.packageInfo.version,
logger: logger.get('root'),
alertingPluginSetupContract: plugins.alerting,
writeEnabled: config.writeEnabled,
});
return rootRegistry;
}
public start() {}
public stop() {}
}

View file

@ -0,0 +1,174 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Either, isLeft, isRight } from 'fp-ts/lib/Either';
import { Errors } from 'io-ts';
import { PathReporter } from 'io-ts/lib/PathReporter';
import { Logger, SavedObjectsClientContract } from 'kibana/server';
import { IScopedClusterClient as ScopedClusterClient } from 'src/core/server';
import { compact } from 'lodash';
import { ESSearchRequest } from 'typings/elasticsearch';
import { ClusterClientAdapter } from '../../../../event_log/server';
import { runtimeTypeFromFieldMap, OutputOfFieldMap } from '../field_map/runtime_type_from_fieldmap';
import { ScopedRuleRegistryClient, EventsOf } from './types';
import { DefaultFieldMap } from '../defaults/field_map';
const getRuleUuids = async ({
savedObjectsClient,
namespace,
}: {
savedObjectsClient: SavedObjectsClientContract;
namespace?: string;
}) => {
const options = {
type: 'alert',
...(namespace ? { namespace } : {}),
};
const pitFinder = savedObjectsClient.createPointInTimeFinder({
...options,
});
const ruleUuids: string[] = [];
for await (const response of pitFinder.find()) {
ruleUuids.push(...response.saved_objects.map((object) => object.id));
}
await pitFinder.close();
return ruleUuids;
};
const createPathReporterError = (either: Either<Errors, unknown>) => {
const error = new Error(`Failed to validate alert event`);
error.stack += '\n' + PathReporter.report(either).join('\n');
return error;
};
export function createScopedRuleRegistryClient<TFieldMap extends DefaultFieldMap>({
fieldMap,
scopedClusterClient,
savedObjectsClient,
namespace,
clusterClientAdapter,
indexAliasName,
indexTarget,
logger,
ruleData,
}: {
fieldMap: TFieldMap;
scopedClusterClient: ScopedClusterClient;
savedObjectsClient: SavedObjectsClientContract;
namespace?: string;
clusterClientAdapter: ClusterClientAdapter<{
body: OutputOfFieldMap<TFieldMap>;
index: string;
}>;
indexAliasName: string;
indexTarget: string;
logger: Logger;
ruleData?: {
rule: {
id: string;
uuid: string;
category: string;
name: string;
};
producer: string;
tags: string[];
};
}): ScopedRuleRegistryClient<TFieldMap> {
const docRt = runtimeTypeFromFieldMap(fieldMap);
const defaults: Partial<OutputOfFieldMap<DefaultFieldMap>> = ruleData
? {
'rule.uuid': ruleData.rule.uuid,
'rule.id': ruleData.rule.id,
'rule.name': ruleData.rule.name,
'rule.category': ruleData.rule.category,
'kibana.rac.producer': ruleData.producer,
tags: ruleData.tags,
}
: {};
const client: ScopedRuleRegistryClient<TFieldMap> = {
search: async (searchRequest) => {
const ruleUuids = await getRuleUuids({
savedObjectsClient,
namespace,
});
const response = await scopedClusterClient.asInternalUser.search({
...searchRequest,
index: indexTarget,
body: {
...searchRequest.body,
query: {
bool: {
filter: [
{ terms: { 'rule.uuid': ruleUuids } },
...(searchRequest.body?.query ? [searchRequest.body.query] : []),
],
},
},
},
});
return {
body: response.body as any,
events: compact(
response.body.hits.hits.map((hit) => {
const validation = docRt.decode(hit.fields);
if (isLeft(validation)) {
const error = createPathReporterError(validation);
logger.error(error);
return undefined;
}
return docRt.encode(validation.right);
})
) as EventsOf<ESSearchRequest, TFieldMap>,
};
},
index: (doc) => {
const validation = docRt.decode({
...doc,
...defaults,
});
if (isLeft(validation)) {
throw createPathReporterError(validation);
}
clusterClientAdapter.indexDocument({ body: validation.right, index: indexAliasName });
},
bulkIndex: (docs) => {
const validations = docs.map((doc) => {
return docRt.decode({
...doc,
...defaults,
});
});
const errors = compact(
validations.map((validation) =>
isLeft(validation) ? createPathReporterError(validation) : null
)
);
errors.forEach((error) => {
logger.error(error);
});
const operations = compact(
validations.map((validation) => (isRight(validation) ? validation.right : null))
).map((doc) => ({ body: doc, index: indexAliasName }));
return clusterClientAdapter.indexDocuments(operations);
},
};
return client;
}

View file

@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ESSearchRequest, ESSearchResponse } from 'typings/elasticsearch';
import { DefaultFieldMap } from '../defaults/field_map';
import { PatternsUnionOf, PickWithPatterns } from '../field_map/pick_with_patterns';
import { OutputOfFieldMap } from '../field_map/runtime_type_from_fieldmap';
export type PrepopulatedRuleEventFields =
| 'rule.uuid'
| 'rule.id'
| 'rule.name'
| 'rule.type'
| 'rule.category'
| 'producer';
type FieldsOf<TFieldMap extends DefaultFieldMap> =
| Array<{ field: PatternsUnionOf<TFieldMap> } | PatternsUnionOf<TFieldMap>>
| PatternsUnionOf<TFieldMap>;
type Fields<TPattern extends string> = Array<{ field: TPattern } | TPattern> | TPattern;
type FieldsESSearchRequest<TFieldMap extends DefaultFieldMap> = ESSearchRequest & {
body?: { fields: FieldsOf<TFieldMap> };
};
export type EventsOf<
TFieldsESSearchRequest extends ESSearchRequest,
TFieldMap extends DefaultFieldMap
> = TFieldsESSearchRequest extends { body: { fields: infer TFields } }
? TFields extends Fields<infer TPattern>
? Array<OutputOfFieldMap<PickWithPatterns<TFieldMap, TPattern[]>>>
: never
: never;
export interface ScopedRuleRegistryClient<TFieldMap extends DefaultFieldMap> {
search<TSearchRequest extends FieldsESSearchRequest<TFieldMap>>(
request: TSearchRequest
): Promise<{
body: ESSearchResponse<unknown, TSearchRequest>;
events: EventsOf<TSearchRequest, TFieldMap>;
}>;
index(doc: Omit<OutputOfFieldMap<TFieldMap>, PrepopulatedRuleEventFields>): void;
bulkIndex(
doc: Array<Omit<OutputOfFieldMap<TFieldMap>, PrepopulatedRuleEventFields>>
): Promise<void>;
}

View file

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ecsFieldMap } from '../../generated/ecs_field_map';
import { pickWithPatterns } from '../field_map/pick_with_patterns';
export const defaultFieldMap = {
...pickWithPatterns(
ecsFieldMap,
'@timestamp',
'event.kind',
'event.action',
'rule.uuid',
'rule.id',
'rule.name',
'rule.category',
'tags'
),
'kibana.rac.producer': { type: 'keyword' },
'kibana.rac.alert.uuid': { type: 'keyword' },
'kibana.rac.alert.id': { type: 'keyword' },
'kibana.rac.alert.start': { type: 'date' },
'kibana.rac.alert.end': { type: 'date' },
'kibana.rac.alert.duration.us': { type: 'long' },
'kibana.rac.alert.severity.level': { type: 'keyword' },
'kibana.rac.alert.severity.value': { type: 'long' },
'kibana.rac.alert.status': { type: 'keyword' },
} as const;
export type DefaultFieldMap = typeof defaultFieldMap;

View file

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ILMPolicy } from '../types';
export const defaultIlmPolicy: ILMPolicy = {
policy: {
phases: {
hot: {
actions: {
rollover: {
max_age: '90d',
max_size: '50gb',
},
},
},
delete: {
actions: {
delete: {},
},
},
},
},
};

View file

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { set } from '@elastic/safer-lodash-set';
import { FieldMap, Mappings } from '../types';
export function mappingFromFieldMap(fieldMap: FieldMap): Mappings {
const mappings = {
dynamic: 'strict' as const,
properties: {},
};
const fields = Object.keys(fieldMap).map((key) => {
const field = fieldMap[key];
return {
name: key,
...field,
};
});
fields.forEach((field) => {
const { name, required, array, ...rest } = field;
set(mappings.properties, field.name.split('.').join('.properties.'), rest);
});
return mappings;
}

View file

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import util from 'util';
import { FieldMap } from '../types';
export function mergeFieldMaps<T1 extends FieldMap, T2 extends FieldMap>(
first: T1,
second: T2
): T1 & T2 {
const conflicts: Array<Record<string, [{ type: string }, { type: string }]>> = [];
Object.keys(second).forEach((name) => {
const field = second[name];
const parts = name.split('.');
const parents = parts.slice(0, parts.length - 2).map((part, index, array) => {
return [...array.slice(0, index - 1), part].join('.');
});
parents
.filter((parent) => first[parent] !== undefined)
.forEach((parent) => {
conflicts.push({
[parent]: [{ type: 'object' }, first[parent]!],
});
});
if (first[name]) {
conflicts.push({
[name]: [field, first[name]],
});
}
});
if (conflicts.length) {
const err = new Error(`Could not merge mapping due to conflicts`);
Object.assign(err, { conflicts: util.inspect(conflicts, { depth: null }) });
throw err;
}
return {
...first,
...second,
};
}

View file

@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { pickWithPatterns } from './pick_with_patterns';
describe('pickWithPatterns', () => {
const fieldMap = {
'event.category': { type: 'keyword' },
'event.kind': { type: 'keyword' },
'destination.bytes': {
type: 'long',
array: false,
required: false,
},
'destination.domain': {
type: 'keyword',
array: false,
required: false,
},
'destination.geo.city_name': {
type: 'keyword',
array: false,
required: false,
},
} as const;
it('picks a single field', () => {
expect(Object.keys(pickWithPatterns(fieldMap, 'event.category'))).toEqual(['event.category']);
});
it('picks event fields', () => {
expect(Object.keys(pickWithPatterns(fieldMap, 'event.*')).sort()).toEqual([
'event.category',
'event.kind',
]);
});
it('picks destination.geo fields', () => {
expect(Object.keys(pickWithPatterns(fieldMap, 'destination.geo.*')).sort()).toEqual([
'destination.geo.city_name',
]);
});
it('picks all destination fields', () => {
expect(Object.keys(pickWithPatterns(fieldMap, 'destination.*')).sort()).toEqual([
'destination.bytes',
'destination.domain',
'destination.geo.city_name',
]);
});
it('picks fields from multiple patterns', () => {
expect(
Object.keys(pickWithPatterns(fieldMap, 'destination.geo.*', 'event.category')).sort()
).toEqual(['destination.geo.city_name', 'event.category']);
});
it('picks all fields', () => {
expect(Object.keys(pickWithPatterns(fieldMap, '*')).sort()).toEqual([
'destination.bytes',
'destination.domain',
'destination.geo.city_name',
'event.category',
'event.kind',
]);
});
});

View file

@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ValuesType, SetIntersection, OmitByValueExact } from 'utility-types';
import { pick } from 'lodash';
type SplitByDot<
TPath extends string,
TPrefix extends string = ''
> = TPath extends `${infer TKey}.${infer TRest}`
? [`${TPrefix}${TKey}.*`, ...SplitByDot<TRest, `${TPrefix}${TKey}.`>]
: [`${TPrefix}${TPath}`];
type PatternMapOf<T extends Record<string, any>> = {
[TKey in keyof T]: ValuesType<TKey extends string ? ['*', ...SplitByDot<TKey>] : never>;
};
export type PickWithPatterns<
T extends Record<string, any>,
TPatterns extends string[]
> = OmitByValueExact<
{
[TFieldName in keyof T]: SetIntersection<
ValuesType<TPatterns>,
PatternMapOf<T>[TFieldName]
> extends never
? never
: T[TFieldName];
},
never
>;
export type PatternsUnionOf<T extends Record<string, any>> = '*' | ValuesType<PatternMapOf<T>>;
export function pickWithPatterns<
T extends Record<string, any>,
TPatterns extends Array<PatternsUnionOf<T>>
>(map: T, ...patterns: TPatterns): PickWithPatterns<T, TPatterns> {
const allFields = Object.keys(map);
const matchedFields = allFields.filter((field) =>
patterns.some((pattern) => {
if (pattern === field) {
return true;
}
const fieldParts = field.split('.');
const patternParts = pattern.split('.');
if (patternParts.indexOf('*') !== patternParts.length - 1) {
return false;
}
return fieldParts.every((fieldPart, index) => {
const patternPart = patternParts.length - 1 < index ? '*' : patternParts[index];
return fieldPart === patternPart || patternPart === '*';
});
})
);
return (pick(map, matchedFields) as unknown) as PickWithPatterns<T, TPatterns>;
}

View file

@ -0,0 +1,95 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { runtimeTypeFromFieldMap } from './runtime_type_from_fieldmap';
describe('runtimeTypeFromFieldMap', () => {
const fieldmapRt = runtimeTypeFromFieldMap({
keywordField: { type: 'keyword' },
longField: { type: 'long' },
requiredKeywordField: { type: 'keyword', required: true },
multiKeywordField: { type: 'keyword', array: true },
} as const);
it('accepts both singular and array fields', () => {
expect(
fieldmapRt.is({
requiredKeywordField: 'keyword',
})
).toBe(true);
expect(
fieldmapRt.is({
requiredKeywordField: ['keyword'],
})
).toBe(true);
expect(
fieldmapRt.is({
requiredKeywordField: ['keyword'],
multiKeywordField: 'keyword',
})
).toBe(true);
expect(
fieldmapRt.is({
requiredKeywordField: ['keyword'],
multiKeywordField: ['keyword'],
})
).toBe(true);
});
it('fails on invalid data types', () => {
expect(
fieldmapRt.is({
requiredKeywordField: 2,
})
).toBe(false);
expect(
fieldmapRt.is({
requiredKeywordField: [2],
})
).toBe(false);
expect(
fieldmapRt.is({
requiredKeywordField: ['keyword'],
longField: ['keyword'],
})
).toBe(false);
expect(
fieldmapRt.is({
requiredKeywordField: ['keyword'],
longField: [3],
})
).toBe(true);
expect(
fieldmapRt.is({
requiredKeywordField: ['keyword'],
longField: 3,
})
).toBe(true);
});
it('outputs to single or array values', () => {
expect(
fieldmapRt.encode({
requiredKeywordField: ['required'],
keywordField: 'keyword',
longField: [3, 2],
multiKeywordField: ['keyword', 'foo'],
})
).toEqual({
requiredKeywordField: 'required',
keywordField: 'keyword',
longField: 3,
multiKeywordField: ['keyword', 'foo'],
});
});
});

View file

@ -0,0 +1,108 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { mapValues, pickBy } from 'lodash';
import * as t from 'io-ts';
import { Mutable, PickByValueExact } from 'utility-types';
import { FieldMap } from '../types';
const esFieldTypeMap = {
keyword: t.string,
text: t.string,
date: t.string,
boolean: t.boolean,
byte: t.number,
long: t.number,
integer: t.number,
short: t.number,
double: t.number,
float: t.number,
scaled_float: t.number,
unsigned_long: t.number,
flattened: t.record(t.string, t.array(t.string)),
};
type EsFieldTypeMap = typeof esFieldTypeMap;
type EsFieldTypeOf<T extends string> = T extends keyof EsFieldTypeMap
? EsFieldTypeMap[T]
: t.UnknownC;
type RequiredKeysOf<T extends Record<string, { required?: boolean }>> = keyof PickByValueExact<
{
[key in keyof T]: T[key]['required'];
},
true
>;
type IntersectionTypeOf<
T extends Record<string, { required?: boolean; type: t.Any }>
> = t.IntersectionC<
[
t.TypeC<Pick<{ [key in keyof T]: T[key]['type'] }, RequiredKeysOf<T>>>,
t.PartialC<{ [key in keyof T]: T[key]['type'] }>
]
>;
type CastArray<T extends t.Type<any>> = t.Type<
t.TypeOf<T> | Array<t.TypeOf<T>>,
Array<t.TypeOf<T>>,
unknown
>;
type CastSingle<T extends t.Type<any>> = t.Type<
t.TypeOf<T> | Array<t.TypeOf<T>>,
t.TypeOf<T>,
unknown
>;
const createCastArrayRt = <T extends t.Type<any>>(type: T): CastArray<T> => {
const union = t.union([type, t.array(type)]);
return new t.Type('castArray', union.is, union.validate, (a) => (Array.isArray(a) ? a : [a]));
};
const createCastSingleRt = <T extends t.Type<any>>(type: T): CastSingle<T> => {
const union = t.union([type, t.array(type)]);
return new t.Type('castSingle', union.is, union.validate, (a) => (Array.isArray(a) ? a[0] : a));
};
type MapTypeValues<T extends FieldMap> = {
[key in keyof T]: {
required: T[key]['required'];
type: T[key]['array'] extends true
? CastArray<EsFieldTypeOf<T[key]['type']>>
: CastSingle<EsFieldTypeOf<T[key]['type']>>;
};
};
type FieldMapType<T extends FieldMap> = IntersectionTypeOf<MapTypeValues<T>>;
export type TypeOfFieldMap<T extends FieldMap> = Mutable<t.TypeOf<FieldMapType<T>>>;
export type OutputOfFieldMap<T extends FieldMap> = Mutable<t.OutputOf<FieldMapType<T>>>;
export function runtimeTypeFromFieldMap<TFieldMap extends FieldMap>(
fieldMap: TFieldMap
): FieldMapType<TFieldMap> {
function mapToType(fields: FieldMap) {
return mapValues(fields, (field, key) => {
const type =
field.type in esFieldTypeMap
? esFieldTypeMap[field.type as keyof EsFieldTypeMap]
: t.unknown;
return field.array ? createCastArrayRt(type) : createCastSingleRt(type);
});
}
const required = pickBy(fieldMap, (field) => field.required);
return (t.intersection([
t.exact(t.partial(mapToType(fieldMap))),
t.type(mapToType(required)),
]) as unknown) as FieldMapType<TFieldMap>;
}

View file

@ -0,0 +1,240 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { CoreSetup, Logger, RequestHandlerContext } from 'kibana/server';
import { inspect } from 'util';
import { SpacesServiceStart } from '../../../spaces/server';
import {
ActionVariable,
AlertInstanceState,
AlertTypeParams,
AlertTypeState,
} from '../../../alerting/common';
import { createReadySignal, ClusterClientAdapter } from '../../../event_log/server';
import { FieldMap, ILMPolicy } from './types';
import { RuleParams, RuleType } from '../types';
import { mergeFieldMaps } from './field_map/merge_field_maps';
import { OutputOfFieldMap } from './field_map/runtime_type_from_fieldmap';
import { mappingFromFieldMap } from './field_map/mapping_from_field_map';
import { PluginSetupContract as AlertingPluginSetupContract } from '../../../alerting/server';
import { createScopedRuleRegistryClient } from './create_scoped_rule_registry_client';
import { DefaultFieldMap } from './defaults/field_map';
import { ScopedRuleRegistryClient } from './create_scoped_rule_registry_client/types';
interface RuleRegistryOptions<TFieldMap extends FieldMap> {
kibanaIndex: string;
kibanaVersion: string;
name: string;
logger: Logger;
coreSetup: CoreSetup;
spacesStart?: SpacesServiceStart;
fieldMap: TFieldMap;
ilmPolicy: ILMPolicy;
alertingPluginSetupContract: AlertingPluginSetupContract;
writeEnabled: boolean;
}
export class RuleRegistry<TFieldMap extends DefaultFieldMap> {
private readonly esAdapter: ClusterClientAdapter<{
body: OutputOfFieldMap<TFieldMap>;
index: string;
}>;
private readonly children: Array<RuleRegistry<TFieldMap>> = [];
constructor(private readonly options: RuleRegistryOptions<TFieldMap>) {
const { logger, coreSetup } = options;
const { wait, signal } = createReadySignal<boolean>();
this.esAdapter = new ClusterClientAdapter<{
body: OutputOfFieldMap<TFieldMap>;
index: string;
}>({
wait,
elasticsearchClientPromise: coreSetup
.getStartServices()
.then(([{ elasticsearch }]) => elasticsearch.client.asInternalUser),
logger: logger.get('esAdapter'),
});
if (this.options.writeEnabled) {
this.initialize()
.then(() => {
this.options.logger.debug('Bootstrapped alerts index');
signal(true);
})
.catch((err) => {
logger.error(inspect(err, { depth: null }));
signal(false);
});
} else {
logger.debug('Write disabled, indices are not being bootstrapped');
}
}
private getEsNames() {
const base = [this.options.kibanaIndex, this.options.name];
const indexTarget = `${base.join('-')}*`;
const indexAliasName = [...base, this.options.kibanaVersion.toLowerCase()].join('-');
const policyName = [...base, 'policy'].join('-');
return {
indexAliasName,
indexTarget,
policyName,
};
}
private async initialize() {
const { indexAliasName, policyName } = this.getEsNames();
const ilmPolicyExists = await this.esAdapter.doesIlmPolicyExist(policyName);
if (!ilmPolicyExists) {
await this.esAdapter.createIlmPolicy(
policyName,
(this.options.ilmPolicy as unknown) as Record<string, unknown>
);
}
const templateExists = await this.esAdapter.doesIndexTemplateExist(indexAliasName);
if (!templateExists) {
await this.esAdapter.createIndexTemplate(indexAliasName, {
index_patterns: [`${indexAliasName}-*`],
settings: {
number_of_shards: 1,
auto_expand_replicas: '0-1',
'index.lifecycle.name': policyName,
'index.lifecycle.rollover_alias': indexAliasName,
'sort.field': '@timestamp',
'sort.order': 'desc',
},
mappings: mappingFromFieldMap(this.options.fieldMap),
});
}
const aliasExists = await this.esAdapter.doesAliasExist(indexAliasName);
if (!aliasExists) {
await this.esAdapter.createIndex(`${indexAliasName}-000001`, {
aliases: {
[indexAliasName]: {
is_write_index: true,
},
},
});
}
}
createScopedRuleRegistryClient({
context,
}: {
context: RequestHandlerContext;
}): ScopedRuleRegistryClient<TFieldMap> | undefined {
if (!this.options.writeEnabled) {
return undefined;
}
const { indexAliasName, indexTarget } = this.getEsNames();
return createScopedRuleRegistryClient({
savedObjectsClient: context.core.savedObjects.getClient({ includedHiddenTypes: ['alert'] }),
scopedClusterClient: context.core.elasticsearch.client,
clusterClientAdapter: this.esAdapter,
fieldMap: this.options.fieldMap,
indexAliasName,
indexTarget,
logger: this.options.logger,
});
}
registerType<TRuleParams extends RuleParams, TActionVariable extends ActionVariable>(
type: RuleType<TFieldMap, TRuleParams, TActionVariable>
) {
const logger = this.options.logger.get(type.id);
const { indexAliasName, indexTarget } = this.getEsNames();
this.options.alertingPluginSetupContract.registerType<
AlertTypeParams,
AlertTypeState,
AlertInstanceState,
{ [key in TActionVariable['name']]: any },
string
>({
...type,
executor: async (executorOptions) => {
const { services, namespace, alertId, name, tags } = executorOptions;
const rule = {
id: type.id,
uuid: alertId,
category: type.name,
name,
};
const producer = type.producer;
return type.executor({
...executorOptions,
rule,
producer,
services: {
...services,
logger,
...(this.options.writeEnabled
? {
scopedRuleRegistryClient: createScopedRuleRegistryClient({
savedObjectsClient: services.savedObjectsClient,
scopedClusterClient: services.scopedClusterClient,
clusterClientAdapter: this.esAdapter,
fieldMap: this.options.fieldMap,
indexAliasName,
indexTarget,
namespace,
ruleData: {
producer,
rule,
tags,
},
logger: this.options.logger,
}),
}
: {}),
},
});
},
});
}
create<TNextFieldMap extends FieldMap>({
name,
fieldMap,
ilmPolicy,
}: {
name: string;
fieldMap: TNextFieldMap;
ilmPolicy?: ILMPolicy;
}): RuleRegistry<TFieldMap & TNextFieldMap> {
const mergedFieldMap = fieldMap
? mergeFieldMaps(this.options.fieldMap, fieldMap)
: this.options.fieldMap;
const child = new RuleRegistry({
...this.options,
logger: this.options.logger.get(name),
name: [this.options.name, name].filter(Boolean).join('-'),
fieldMap: mergedFieldMap,
...(ilmPolicy ? { ilmPolicy } : {}),
});
this.children.push(child);
// @ts-expect-error could be instantiated with a different subtype of constraint
return child;
}
}

View file

@ -0,0 +1,235 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import * as t from 'io-ts';
import { isLeft } from 'fp-ts/lib/Either';
import v4 from 'uuid/v4';
import { AlertInstance } from '../../../../alerting/server';
import { ActionVariable, AlertInstanceState } from '../../../../alerting/common';
import { RuleParams, RuleType } from '../../types';
import { DefaultFieldMap } from '../defaults/field_map';
import { OutputOfFieldMap } from '../field_map/runtime_type_from_fieldmap';
import { PrepopulatedRuleEventFields } from '../create_scoped_rule_registry_client/types';
import { RuleRegistry } from '..';
type UserDefinedAlertFields<TFieldMap extends DefaultFieldMap> = Omit<
OutputOfFieldMap<TFieldMap>,
PrepopulatedRuleEventFields | 'kibana.rac.alert.id' | 'kibana.rac.alert.uuid' | '@timestamp'
>;
type LifecycleAlertService<
TFieldMap extends DefaultFieldMap,
TActionVariable extends ActionVariable
> = (alert: {
id: string;
fields: UserDefinedAlertFields<TFieldMap>;
}) => AlertInstance<AlertInstanceState, { [key in TActionVariable['name']]: any }, string>;
type CreateLifecycleRuleType<TFieldMap extends DefaultFieldMap> = <
TRuleParams extends RuleParams,
TActionVariable extends ActionVariable
>(
type: RuleType<
TFieldMap,
TRuleParams,
TActionVariable,
{ alertWithLifecycle: LifecycleAlertService<TFieldMap, TActionVariable> }
>
) => RuleType<TFieldMap, TRuleParams, TActionVariable>;
const trackedAlertStateRt = t.type({
alertId: t.string,
alertUuid: t.string,
started: t.string,
});
const wrappedStateRt = t.type({
wrapped: t.record(t.string, t.unknown),
trackedAlerts: t.record(t.string, trackedAlertStateRt),
});
export function createLifecycleRuleTypeFactory<
TRuleRegistry extends RuleRegistry<DefaultFieldMap>
>(): TRuleRegistry extends RuleRegistry<infer TFieldMap>
? CreateLifecycleRuleType<TFieldMap>
: never;
export function createLifecycleRuleTypeFactory(): CreateLifecycleRuleType<DefaultFieldMap> {
return (type) => {
return {
...type,
executor: async (options) => {
const {
services: { scopedRuleRegistryClient, alertInstanceFactory, logger },
state: previousState,
rule,
} = options;
const decodedState = wrappedStateRt.decode(previousState);
const state = isLeft(decodedState)
? {
wrapped: previousState,
trackedAlerts: {},
}
: decodedState.right;
const currentAlerts: Record<
string,
UserDefinedAlertFields<DefaultFieldMap> & { 'kibana.rac.alert.id': string }
> = {};
const timestamp = options.startedAt.toISOString();
const nextWrappedState = await type.executor({
...options,
state: state.wrapped,
services: {
...options.services,
alertWithLifecycle: ({ id, fields }) => {
currentAlerts[id] = {
...fields,
'kibana.rac.alert.id': id,
};
return alertInstanceFactory(id);
},
},
});
const currentAlertIds = Object.keys(currentAlerts);
const trackedAlertIds = Object.keys(state.trackedAlerts);
const newAlertIds = currentAlertIds.filter((alertId) => !trackedAlertIds.includes(alertId));
const allAlertIds = [...new Set(currentAlertIds.concat(trackedAlertIds))];
const trackedAlertStatesOfRecovered = Object.values(state.trackedAlerts).filter(
(trackedAlertState) => !currentAlerts[trackedAlertState.alertId]
);
logger.debug(
`Tracking ${allAlertIds.length} alerts (${newAlertIds.length} new, ${trackedAlertStatesOfRecovered.length} recovered)`
);
const alertsDataMap: Record<string, UserDefinedAlertFields<DefaultFieldMap>> = {
...currentAlerts,
};
if (scopedRuleRegistryClient && trackedAlertStatesOfRecovered.length) {
const { events } = await scopedRuleRegistryClient.search({
body: {
query: {
bool: {
filter: [
{
term: {
'rule.uuid': rule.uuid,
},
},
{
terms: {
'kibana.rac.alert.uuid': trackedAlertStatesOfRecovered.map(
(trackedAlertState) => trackedAlertState.alertUuid
),
},
},
],
},
},
size: trackedAlertStatesOfRecovered.length,
collapse: {
field: 'kibana.rac.alert.uuid',
},
_source: false,
fields: ['*'],
sort: {
'@timestamp': 'desc' as const,
},
},
});
events.forEach((event) => {
const alertId = event['kibana.rac.alert.id']!;
alertsDataMap[alertId] = event;
});
}
const eventsToIndex: Array<OutputOfFieldMap<DefaultFieldMap>> = allAlertIds.map(
(alertId) => {
const alertData = alertsDataMap[alertId];
if (!alertData) {
logger.warn(`Could not find alert data for ${alertId}`);
}
const event: OutputOfFieldMap<DefaultFieldMap> = {
...alertData,
'@timestamp': timestamp,
'event.kind': 'state',
'kibana.rac.alert.id': alertId,
};
const isNew = !state.trackedAlerts[alertId];
const isRecovered = !currentAlerts[alertId];
const isActiveButNotNew = !isNew && !isRecovered;
const isActive = !isRecovered;
const { alertUuid, started } = state.trackedAlerts[alertId] ?? {
alertUuid: v4(),
started: timestamp,
};
event['kibana.rac.alert.start'] = started;
event['kibana.rac.alert.uuid'] = alertUuid;
if (isNew) {
event['event.action'] = 'open';
}
if (isRecovered) {
event['kibana.rac.alert.end'] = timestamp;
event['event.action'] = 'close';
event['kibana.rac.alert.status'] = 'closed';
}
if (isActiveButNotNew) {
event['event.action'] = 'active';
}
if (isActive) {
event['kibana.rac.alert.status'] = 'open';
}
event['kibana.rac.alert.duration.us'] =
(options.startedAt.getTime() - new Date(event['kibana.rac.alert.start']!).getTime()) *
1000;
return event;
}
);
if (eventsToIndex.length && scopedRuleRegistryClient) {
await scopedRuleRegistryClient.bulkIndex(eventsToIndex);
}
const nextTrackedAlerts = Object.fromEntries(
eventsToIndex
.filter((event) => event['kibana.rac.alert.status'] !== 'closed')
.map((event) => {
const alertId = event['kibana.rac.alert.id']!;
const alertUuid = event['kibana.rac.alert.uuid']!;
const started = new Date(event['kibana.rac.alert.start']!).toISOString();
return [alertId, { alertId, alertUuid, started }];
})
);
return {
wrapped: nextWrappedState,
trackedAlerts: nextTrackedAlerts,
};
},
};
};
}

View file

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export interface Mappings {
dynamic: 'strict' | boolean;
properties: Record<string, { type: string } | Mappings>;
}
enum ILMPolicyPhase {
hot = 'hot',
delete = 'delete',
}
enum ILMPolicyAction {
rollover = 'rollover',
delete = 'delete',
}
interface ILMActionOptions {
[ILMPolicyAction.rollover]: {
max_size: string;
max_age: string;
};
[ILMPolicyAction.delete]: {};
}
export interface ILMPolicy {
policy: {
phases: Record<
ILMPolicyPhase,
{
actions: {
[key in keyof ILMActionOptions]?: ILMActionOptions[key];
};
}
>;
};
}
export type FieldMap = Record<string, { type: string; required?: boolean; array?: boolean }>;

View file

@ -0,0 +1,100 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Type, TypeOf } from '@kbn/config-schema';
import { Logger } from 'kibana/server';
import {
ActionVariable,
AlertInstanceContext,
AlertInstanceState,
AlertTypeParams,
AlertTypeState,
} from '../../alerting/common';
import { ActionGroup, AlertExecutorOptions } from '../../alerting/server';
import { RuleRegistry } from './rule_registry';
import { ScopedRuleRegistryClient } from './rule_registry/create_scoped_rule_registry_client/types';
import { DefaultFieldMap } from './rule_registry/defaults/field_map';
export type RuleParams = Type<any>;
type TypeOfRuleParams<TRuleParams extends RuleParams> = TypeOf<TRuleParams>;
type RuleExecutorServices<
TFieldMap extends DefaultFieldMap,
TActionVariable extends ActionVariable
> = AlertExecutorOptions<
AlertTypeParams,
AlertTypeState,
AlertInstanceState,
{ [key in TActionVariable['name']]: any },
string
>['services'] & {
logger: Logger;
scopedRuleRegistryClient?: ScopedRuleRegistryClient<TFieldMap>;
};
type PassthroughAlertExecutorOptions = Pick<
AlertExecutorOptions<
AlertTypeParams,
AlertTypeState,
AlertInstanceState,
AlertInstanceContext,
string
>,
'previousStartedAt' | 'startedAt' | 'state'
>;
type RuleExecutorFunction<
TFieldMap extends DefaultFieldMap,
TRuleParams extends RuleParams,
TActionVariable extends ActionVariable,
TAdditionalRuleExecutorServices extends Record<string, any>
> = (
options: PassthroughAlertExecutorOptions & {
services: RuleExecutorServices<TFieldMap, TActionVariable> & TAdditionalRuleExecutorServices;
params: TypeOfRuleParams<TRuleParams>;
rule: {
id: string;
uuid: string;
name: string;
category: string;
};
producer: string;
}
) => Promise<Record<string, any>>;
interface RuleTypeBase {
id: string;
name: string;
actionGroups: Array<ActionGroup<string>>;
defaultActionGroupId: string;
producer: string;
minimumLicenseRequired: 'basic' | 'gold' | 'trial';
}
export type RuleType<
TFieldMap extends DefaultFieldMap,
TRuleParams extends RuleParams,
TActionVariable extends ActionVariable,
TAdditionalRuleExecutorServices extends Record<string, any> = {}
> = RuleTypeBase & {
validate: {
params: TRuleParams;
};
actionVariables: {
context: TActionVariable[];
};
executor: RuleExecutorFunction<
TFieldMap,
TRuleParams,
TActionVariable,
TAdditionalRuleExecutorServices
>;
};
export type FieldMapOf<
TRuleRegistry extends RuleRegistry<any>
> = TRuleRegistry extends RuleRegistry<infer TFieldMap> ? TFieldMap : never;

View file

@ -0,0 +1,15 @@
{
"extends": "../../../tsconfig.base.json",
"compilerOptions": {
"composite": true,
"outDir": "./target/types",
"emitDeclarationOnly": true,
"declaration": true,
"declarationMap": true
},
"include": ["common/**/*", "server/**/*", "../../../typings/**/*"],
"references": [
{ "path": "../../../src/core/tsconfig.json" },
{ "path": "../alerting/tsconfig.json" },
]
}

View file

@ -34,6 +34,7 @@ const onlyNotInCoverageTests = [
require.resolve('../test/case_api_integration/basic/config.ts'),
require.resolve('../test/apm_api_integration/basic/config.ts'),
require.resolve('../test/apm_api_integration/trial/config.ts'),
require.resolve('../test/apm_api_integration/rules/config.ts'),
require.resolve('../test/detection_engine_api_integration/security_and_spaces/config.ts'),
require.resolve('../test/detection_engine_api_integration/basic/config.ts'),
require.resolve('../test/lists_api_integration/security_and_spaces/config.ts'),

View file

@ -18,6 +18,7 @@ import { registry } from './registry';
interface Config {
name: APMFtrConfigName;
license: 'basic' | 'trial';
kibanaConfig?: Record<string, string>;
}
const supertestAsApmUser = (kibanaServer: UrlObject, apmUser: ApmUser) => async (
@ -37,7 +38,7 @@ const supertestAsApmUser = (kibanaServer: UrlObject, apmUser: ApmUser) => async
};
export function createTestConfig(config: Config) {
const { license, name } = config;
const { license, name, kibanaConfig } = config;
return async ({ readConfigFile }: FtrConfigProviderContext) => {
const xPackAPITestsConfig = await readConfigFile(
@ -79,7 +80,15 @@ export function createTestConfig(config: Config) {
...xPackAPITestsConfig.get('esTestCluster'),
license,
},
kbnTestServer: xPackAPITestsConfig.get('kbnTestServer'),
kbnTestServer: {
...xPackAPITestsConfig.get('kbnTestServer'),
serverArgs: [
...xPackAPITestsConfig.get('kbnTestServer.serverArgs'),
...(kibanaConfig
? Object.entries(kibanaConfig).map(([key, value]) => `--${key}=${value}`)
: []),
],
},
};
};
}

View file

@ -15,6 +15,12 @@ const apmFtrConfigs = {
trial: {
license: 'trial' as const,
},
rules: {
license: 'trial' as const,
kibanaConfig: {
'xpack.ruleRegistry.writeEnabled': 'true',
},
},
};
export type APMFtrConfigName = keyof typeof apmFtrConfigs;

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { configs } from '../configs';
export default configs.rules;

View file

@ -0,0 +1,387 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import { get, merge, omit } from 'lodash';
import { FtrProviderContext } from '../../common/ftr_provider_context';
import { registry } from '../../common/registry';
interface Alert {
schedule: {
interval: string;
};
updatedAt: string;
executionStatus: {
lastExecutionDate: string;
status: string;
};
updatedBy: string;
id: string;
params: Record<string, unknown>;
scheduledTaskId: string;
}
export default function ApiTest({ getService }: FtrProviderContext) {
const supertest = getService('supertestAsApmWriteUser');
const es = getService('es');
const MAX_POLLS = 5;
const BULK_INDEX_DELAY = 1000;
const INDEXING_DELAY = 5000;
const ALERTS_INDEX_TARGET = '.kibana-alerts-*-apm*';
const APM_TRANSACTION_INDEX_NAME = 'apm-8.0.0-transaction';
const createTransactionEvent = (override: Record<string, any>) => {
const now = Date.now();
const time = now - INDEXING_DELAY;
return merge(
{
'@timestamp': new Date(time).toISOString(),
service: {
name: 'opbeans-go',
},
event: {
outcome: 'success',
},
transaction: {
duration: {
us: 1000000,
},
type: 'request',
},
processor: {
event: 'transaction',
},
observer: {
version_major: 7,
},
},
override
);
};
async function waitUntilNextExecution(
alert: Alert,
intervalInSeconds: number = 1,
count: number = 0
): Promise<Alert> {
await new Promise((resolve) => {
setTimeout(resolve, intervalInSeconds * 1000);
});
const { body, status } = await supertest
.get(`/api/alerts/alert/${alert.id}`)
.set('kbn-xsrf', 'foo');
if (status >= 300) {
const error = new Error('Error getting alert');
Object.assign(error, { response: { body, status } });
throw error;
}
const nextAlert = body as Alert;
if (nextAlert.executionStatus.lastExecutionDate !== alert.executionStatus.lastExecutionDate) {
await new Promise((resolve) => {
setTimeout(resolve, BULK_INDEX_DELAY);
});
await es.indices.refresh({
index: ALERTS_INDEX_TARGET,
});
return nextAlert;
}
if (count >= MAX_POLLS) {
throw new Error('Maximum number of polls exceeded');
}
return waitUntilNextExecution(alert, intervalInSeconds, count + 1);
}
registry.when('Rule registry with write enabled', { config: 'rules', archives: [] }, () => {
it('bootstraps the apm alert indices', async () => {
const { body } = await es.indices.get({
index: ALERTS_INDEX_TARGET,
expand_wildcards: 'open',
allow_no_indices: false,
});
const indices = Object.entries(body).map(([indexName, index]) => {
return {
indexName,
index,
};
});
const indexNames = indices.map((index) => index.indexName);
const apmIndex = indices[0];
// make sure it only creates one index
expect(indices.length).to.be(1);
const apmIndexName = apmIndex.indexName;
expect(apmIndexName.split('-').includes('observability')).to.be(true);
expect(apmIndexName.split('-').includes('apm')).to.be(true);
expect(indexNames[0].startsWith('.kibana-alerts-observability-apm')).to.be(true);
expect(get(apmIndex, 'index.mappings.properties.service.properties.environment.type')).to.be(
'keyword'
);
});
describe('when creating a rule', () => {
let createResponse: {
alert: Alert;
status: number;
};
before(async () => {
await es.indices.create({
index: APM_TRANSACTION_INDEX_NAME,
body: {
mappings: {
dynamic: 'strict',
properties: {
event: {
properties: {
outcome: {
type: 'keyword',
},
},
},
processor: {
properties: {
event: {
type: 'keyword',
},
},
},
observer: {
properties: {
version_major: {
type: 'byte',
},
},
},
service: {
properties: {
name: {
type: 'keyword',
},
environment: {
type: 'keyword',
},
},
},
transaction: {
properties: {
type: {
type: 'keyword',
},
duration: {
properties: {
us: {
type: 'long',
},
},
},
},
},
'@timestamp': {
type: 'date',
},
},
},
},
});
const body = {
params: {
threshold: 30,
windowSize: 5,
windowUnit: 'm',
transactionType: 'request',
environment: 'ENVIRONMENT_ALL',
serviceName: 'opbeans-go',
},
consumer: 'apm',
alertTypeId: 'apm.transaction_error_rate',
schedule: { interval: '5s' },
actions: [],
tags: ['apm', 'service.name:opbeans-go'],
notifyWhen: 'onActionGroupChange',
name: 'Transaction error rate threshold | opbeans-go',
};
const { body: response, status } = await supertest
.post('/api/alerts/alert')
.send(body)
.set('kbn-xsrf', 'foo');
createResponse = {
alert: response,
status,
};
});
after(async () => {
if (createResponse.alert) {
const { body, status } = await supertest
.delete(`/api/alerts/alert/${createResponse.alert.id}`)
.set('kbn-xsrf', 'foo');
if (status >= 300) {
const error = new Error('Error deleting alert');
Object.assign(error, { response: { body, status } });
throw error;
}
}
await es.deleteByQuery({
index: ALERTS_INDEX_TARGET,
body: {
query: {
match_all: {},
},
},
refresh: true,
});
await es.indices.delete({
index: APM_TRANSACTION_INDEX_NAME,
});
});
it('writes alerts data to the alert indices', async () => {
expect(createResponse.status).to.be.below(299);
expect(createResponse.alert).not.to.be(undefined);
let alert = await waitUntilNextExecution(createResponse.alert);
const beforeDataResponse = await es.search({
index: ALERTS_INDEX_TARGET,
body: {
query: {
match_all: {},
},
},
size: 1,
});
expect(beforeDataResponse.body.hits.hits.length).to.be(0);
await es.index({
index: APM_TRANSACTION_INDEX_NAME,
body: createTransactionEvent({
event: {
outcome: 'success',
},
}),
refresh: true,
});
alert = await waitUntilNextExecution(alert);
const afterInitialDataResponse = await es.search({
index: ALERTS_INDEX_TARGET,
body: {
query: {
match_all: {},
},
},
size: 1,
});
expect(afterInitialDataResponse.body.hits.hits.length).to.be(0);
await es.index({
index: APM_TRANSACTION_INDEX_NAME,
body: createTransactionEvent({
event: {
outcome: 'failure',
},
}),
refresh: true,
});
alert = await waitUntilNextExecution(alert);
const afterViolatingDataResponse = await es.search({
index: ALERTS_INDEX_TARGET,
body: {
query: {
match_all: {},
},
},
size: 1,
});
expect(afterViolatingDataResponse.body.hits.hits.length).to.be(1);
const alertEvent = afterViolatingDataResponse.body.hits.hits[0]._source as Record<
string,
any
>;
const toCompare = omit(
alertEvent,
'@timestamp',
'kibana.rac.alert.start',
'kibana.rac.alert.uuid',
'rule.uuid'
);
expectSnapshot(toCompare).toMatchInline(`
Object {
"event.action": "open",
"event.kind": "state",
"kibana.rac.alert.duration.us": 0,
"kibana.rac.alert.id": "apm.transaction_error_rate_opbeans-go_request",
"kibana.rac.alert.status": "open",
"kibana.rac.producer": "apm",
"rule.category": "Transaction error rate threshold",
"rule.id": "apm.transaction_error_rate",
"rule.name": "Transaction error rate threshold | opbeans-go",
"service.name": "opbeans-go",
"tags": Array [
"apm",
"service.name:opbeans-go",
],
"transaction.type": "request",
}
`);
});
});
});
registry.when('Rule registry with write not enabled', { config: 'basic', archives: [] }, () => {
it('does not bootstrap the apm rule indices', async () => {
const errorOrUndefined = await es.indices
.get({
index: ALERTS_INDEX_TARGET,
expand_wildcards: 'open',
allow_no_indices: false,
})
.then(() => {})
.catch((error) => {
return error.toString();
});
expect(errorOrUndefined).not.to.be(undefined);
expect(errorOrUndefined).to.be(`ResponseError: index_not_found_exception`);
});
});
}

View file

@ -24,6 +24,10 @@ export default function apmApiIntegrationTests(providerContext: FtrProviderConte
loadTestFile(require.resolve('./alerts/chart_preview'));
});
describe('alerts/rule_registry', function () {
loadTestFile(require.resolve('./alerts/rule_registry'));
});
describe('correlations/latency_slow_transactions', function () {
loadTestFile(require.resolve('./correlations/latency_slow_transactions'));
});