[ML] Check for error messages in the Anomaly Detection jobs health rule type (#108701)

* [ML] retrieve job errors

* [ML] account for previous execution time

* [ML] update default message

* [ML] update description

* [ML] update unit tests

* [ML] update unit tests

* [ML] update action name

* [ML] update errorMessages name

* [ML] update a default message to avoid line breaks

* [ML] update rule helper text

* [ML] refactor getJobsErrors

* [ML] perform errors check starting from the second execution
This commit is contained in:
Dima Arnautov 2021-08-17 16:21:44 +02:00 committed by GitHub
parent 9258ba5147
commit f243b0540d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 268 additions and 86 deletions

View file

@ -54,12 +54,12 @@ export const HEALTH_CHECK_NAMES: Record<JobsHealthTests, { name: string; descrip
},
errorMessages: {
name: i18n.translate('xpack.ml.alertTypes.jobsHealthAlertingRule.errorMessagesCheckName', {
defaultMessage: 'There are errors in the job messages',
defaultMessage: 'Errors in job messages',
}),
description: i18n.translate(
'xpack.ml.alertTypes.jobsHealthAlertingRule.errorMessagesCheckDescription',
{
defaultMessage: 'There are errors in the job messages',
defaultMessage: 'Get alerted if a job contains errors in the job messages.',
}
),
},

View file

@ -95,6 +95,9 @@ describe('getResultJobsHealthRuleConfig', () => {
enabled: true,
timeInterval: null,
},
errorMessages: {
enabled: true,
},
});
});
test('returns config with overridden values based on provided configuration', () => {
@ -119,6 +122,9 @@ describe('getResultJobsHealthRuleConfig', () => {
enabled: true,
timeInterval: null,
},
errorMessages: {
enabled: true,
},
});
});
});

View file

@ -54,7 +54,7 @@ export function getTopNBuckets(job: Job): number {
return Math.ceil(narrowBucketLength / bucketSpan.asSeconds());
}
const implementedTests = ['datafeed', 'mml', 'delayedData'] as JobsHealthTests[];
const implementedTests = ['datafeed', 'mml', 'delayedData', 'errorMessages'] as JobsHealthTests[];
/**
* Returns tests configuration combined with default values.

View file

@ -21,7 +21,8 @@ export function registerJobsHealthAlertingRule(
triggersActionsUi.ruleTypeRegistry.register({
id: ML_ALERT_TYPES.AD_JOBS_HEALTH,
description: i18n.translate('xpack.ml.alertTypes.jobsHealthAlertingRule.description', {
defaultMessage: 'Alert when anomaly detection jobs experience operational issues.',
defaultMessage:
'Alert when anomaly detection jobs experience operational issues. Enable suitable alerts for critically important jobs.',
}),
iconClass: 'bell',
documentationUrl(docLinks) {
@ -90,14 +91,15 @@ export function registerJobsHealthAlertingRule(
\\{\\{context.message\\}\\}
\\{\\{#context.results\\}\\}
Job ID: \\{\\{job_id\\}\\}
\\{\\{#datafeed_id\\}\\}Datafeed ID: \\{\\{datafeed_id\\}\\} \\{\\{/datafeed_id\\}\\}
\\{\\{#datafeed_state\\}\\}Datafeed state: \\{\\{datafeed_state\\}\\} \\{\\{/datafeed_state\\}\\}
\\{\\{#memory_status\\}\\}Memory status: \\{\\{memory_status\\}\\} \\{\\{/memory_status\\}\\}
\\{\\{#log_time\\}\\}Memory logging time: \\{\\{log_time\\}\\} \\{\\{/log_time\\}\\}
\\{\\{#failed_category_count\\}\\}Failed category count: \\{\\{failed_category_count\\}\\} \\{\\{/failed_category_count\\}\\}
\\{\\{#annotation\\}\\}Annotation: \\{\\{annotation\\}\\} \\{\\{/annotation\\}\\}
\\{\\{#missed_docs_count\\}\\}Number of missed documents: \\{\\{missed_docs_count\\}\\} \\{\\{/missed_docs_count\\}\\}
\\{\\{#end_timestamp\\}\\}Latest finalized bucket with missing docs: \\{\\{end_timestamp\\}\\} \\{\\{/end_timestamp\\}\\}
\\{\\{#datafeed_id\\}\\}Datafeed ID: \\{\\{datafeed_id\\}\\}
\\{\\{/datafeed_id\\}\\} \\{\\{#datafeed_state\\}\\}Datafeed state: \\{\\{datafeed_state\\}\\}
\\{\\{/datafeed_state\\}\\} \\{\\{#memory_status\\}\\}Memory status: \\{\\{memory_status\\}\\}
\\{\\{/memory_status\\}\\} \\{\\{#log_time\\}\\}Memory logging time: \\{\\{log_time\\}\\}
\\{\\{/log_time\\}\\} \\{\\{#failed_category_count\\}\\}Failed category count: \\{\\{failed_category_count\\}\\}
\\{\\{/failed_category_count\\}\\} \\{\\{#annotation\\}\\}Annotation: \\{\\{annotation\\}\\}
\\{\\{/annotation\\}\\} \\{\\{#missed_docs_count\\}\\}Number of missed documents: \\{\\{missed_docs_count\\}\\}
\\{\\{/missed_docs_count\\}\\} \\{\\{#end_timestamp\\}\\}Latest finalized bucket with missing docs: \\{\\{end_timestamp\\}\\}
\\{\\{/end_timestamp\\}\\} \\{\\{#errors\\}\\}Error message: \\{\\{message\\}\\} \\{\\{/errors\\}\\}
\\{\\{/context.results\\}\\}
`,
}

View file

@ -11,9 +11,39 @@ import type { Logger } from 'kibana/server';
import { MlClient } from '../ml_client';
import { MlJob, MlJobStats } from '@elastic/elasticsearch/api/types';
import { AnnotationService } from '../../models/annotation_service/annotation';
import { JobsHealthExecutorOptions } from './register_jobs_monitoring_rule_type';
import { JobAuditMessagesService } from '../../models/job_audit_messages/job_audit_messages';
import { DeepPartial } from '../../../common/types/common';
const MOCK_DATE_NOW = 1487076708000;
function getDefaultExecutorOptions(
overrides: DeepPartial<JobsHealthExecutorOptions> = {}
): JobsHealthExecutorOptions {
return ({
state: {},
startedAt: new Date('2021-08-12T13:13:39.396Z'),
previousStartedAt: new Date('2021-08-12T13:13:27.396Z'),
spaceId: 'default',
namespace: undefined,
name: 'ml-health-check',
tags: [],
createdBy: 'elastic',
updatedBy: 'elastic',
rule: {
name: 'ml-health-check',
tags: [],
consumer: 'alerts',
producer: 'ml',
ruleTypeId: 'xpack.ml.anomaly_detection_jobs_health',
ruleTypeName: 'Anomaly detection jobs health',
enabled: true,
schedule: { interval: '10s' },
},
...overrides,
} as unknown) as JobsHealthExecutorOptions;
}
describe('JobsHealthService', () => {
const mlClient = ({
getJobs: jest.fn().mockImplementation(({ job_id: jobIds = [] }) => {
@ -117,6 +147,12 @@ describe('JobsHealthService', () => {
}),
} as unknown) as jest.Mocked<AnnotationService>;
const jobAuditMessagesService = ({
getJobsErrors: jest.fn().mockImplementation((jobIds: string) => {
return Promise.resolve({});
}),
} as unknown) as jest.Mocked<JobAuditMessagesService>;
const logger = ({
warn: jest.fn(),
info: jest.fn(),
@ -127,6 +163,7 @@ describe('JobsHealthService', () => {
mlClient,
datafeedsService,
annotationService,
jobAuditMessagesService,
logger
);
@ -143,42 +180,52 @@ describe('JobsHealthService', () => {
test('returns empty results when no jobs provided', async () => {
// act
const executionResult = await jobHealthService.getTestsResults('testRule', {
testsConfig: null,
includeJobs: {
jobIds: ['*'],
groupIds: [],
},
excludeJobs: null,
});
const executionResult = await jobHealthService.getTestsResults(
getDefaultExecutorOptions({
rule: { name: 'testRule' },
params: {
testsConfig: null,
includeJobs: {
jobIds: ['*'],
groupIds: [],
},
excludeJobs: null,
},
})
);
expect(logger.warn).toHaveBeenCalledWith('Rule "testRule" does not have associated jobs.');
expect(datafeedsService.getDatafeedByJobId).not.toHaveBeenCalled();
expect(executionResult).toEqual([]);
});
test('returns empty results and does not perform datafeed check when test is disabled', async () => {
const executionResult = await jobHealthService.getTestsResults('testRule', {
testsConfig: {
datafeed: {
enabled: false,
const executionResult = await jobHealthService.getTestsResults(
getDefaultExecutorOptions({
rule: { name: 'testRule' },
params: {
testsConfig: {
datafeed: {
enabled: false,
},
behindRealtime: null,
delayedData: {
enabled: false,
docsCount: null,
timeInterval: null,
},
errorMessages: null,
mml: {
enabled: false,
},
},
includeJobs: {
jobIds: ['test_job_01'],
groupIds: [],
},
excludeJobs: null,
},
behindRealtime: null,
delayedData: {
enabled: false,
docsCount: null,
timeInterval: null,
},
errorMessages: null,
mml: {
enabled: false,
},
},
includeJobs: {
jobIds: ['test_job_01'],
groupIds: [],
},
excludeJobs: null,
});
})
);
expect(logger.warn).not.toHaveBeenCalled();
expect(logger.debug).toHaveBeenCalledWith(`Performing health checks for job IDs: test_job_01`);
expect(datafeedsService.getDatafeedByJobId).not.toHaveBeenCalled();
@ -186,27 +233,32 @@ describe('JobsHealthService', () => {
});
test('takes into account delayed data params', async () => {
const executionResult = await jobHealthService.getTestsResults('testRule_04', {
testsConfig: {
delayedData: {
enabled: true,
docsCount: 10,
timeInterval: '4h',
const executionResult = await jobHealthService.getTestsResults(
getDefaultExecutorOptions({
rule: { name: 'testRule_04' },
params: {
testsConfig: {
delayedData: {
enabled: true,
docsCount: 10,
timeInterval: '4h',
},
behindRealtime: { enabled: false, timeInterval: null },
mml: { enabled: false },
datafeed: { enabled: false },
errorMessages: { enabled: false },
},
includeJobs: {
jobIds: [],
groupIds: ['test_group'],
},
excludeJobs: {
jobIds: ['test_job_03'],
groupIds: [],
},
},
behindRealtime: { enabled: false, timeInterval: null },
mml: { enabled: false },
datafeed: { enabled: false },
errorMessages: { enabled: false },
},
includeJobs: {
jobIds: [],
groupIds: ['test_group'],
},
excludeJobs: {
jobIds: ['test_job_03'],
groupIds: [],
},
});
})
);
expect(annotationService.getDelayedDataAnnotations).toHaveBeenCalledWith({
jobIds: ['test_job_01', 'test_job_02'],
@ -234,17 +286,22 @@ describe('JobsHealthService', () => {
});
test('returns results based on provided selection', async () => {
const executionResult = await jobHealthService.getTestsResults('testRule_03', {
testsConfig: null,
includeJobs: {
jobIds: [],
groupIds: ['test_group'],
},
excludeJobs: {
jobIds: ['test_job_03'],
groupIds: [],
},
});
const executionResult = await jobHealthService.getTestsResults(
getDefaultExecutorOptions({
rule: { name: 'testRule_03' },
params: {
testsConfig: null,
includeJobs: {
jobIds: [],
groupIds: ['test_group'],
},
excludeJobs: {
jobIds: ['test_job_03'],
groupIds: [],
},
},
})
);
expect(logger.warn).not.toHaveBeenCalled();
expect(logger.debug).toHaveBeenCalledWith(
`Performing health checks for job IDs: test_job_01, test_job_02`

View file

@ -11,10 +11,7 @@ import { i18n } from '@kbn/i18n';
import { Logger } from 'kibana/server';
import { MlJob } from '@elastic/elasticsearch/api/types';
import { MlClient } from '../ml_client';
import {
AnomalyDetectionJobsHealthRuleParams,
JobSelection,
} from '../../routes/schemas/alerting_schema';
import { JobSelection } from '../../routes/schemas/alerting_schema';
import { datafeedsProvider, DatafeedsService } from '../../models/job_service/datafeeds';
import { ALL_JOBS_SELECTION, HEALTH_CHECK_NAMES } from '../../../common/constants/alerts';
import { DatafeedStats } from '../../../common/types/anomaly_detection_jobs';
@ -22,6 +19,7 @@ import { GetGuards } from '../../shared_services/shared_services';
import {
AnomalyDetectionJobsHealthAlertContext,
DelayedDataResponse,
JobsHealthExecutorOptions,
MmlTestResponse,
NotStartedDatafeedResponse,
} from './register_jobs_monitoring_rule_type';
@ -33,6 +31,10 @@ import { AnnotationService } from '../../models/annotation_service/annotation';
import { annotationServiceProvider } from '../../models/annotation_service';
import { parseInterval } from '../../../common/util/parse_interval';
import { isDefined } from '../../../common/types/guards';
import {
jobAuditMessagesProvider,
JobAuditMessagesService,
} from '../../models/job_audit_messages/job_audit_messages';
interface TestResult {
name: string;
@ -45,6 +47,7 @@ export function jobsHealthServiceProvider(
mlClient: MlClient,
datafeedsService: DatafeedsService,
annotationService: AnnotationService,
jobAuditMessagesService: JobAuditMessagesService,
logger: Logger
) {
/**
@ -236,13 +239,25 @@ export function jobsHealthServiceProvider(
return annotations;
},
/**
* Retrieves a list of the latest errors per jobs.
* @param jobIds List of job IDs.
* @param previousStartedAt Time of the previous rule execution. As we intend to notify
* about an error only once, limit the scope of the errors search.
*/
async getErrorsReport(jobIds: string[], previousStartedAt: Date) {
return await jobAuditMessagesService.getJobsErrors(jobIds, previousStartedAt.getTime());
},
/**
* Retrieves report grouped by test.
*/
async getTestsResults(
ruleInstanceName: string,
{ testsConfig, includeJobs, excludeJobs }: AnomalyDetectionJobsHealthRuleParams
): Promise<TestsResults> {
async getTestsResults(executorOptions: JobsHealthExecutorOptions): Promise<TestsResults> {
const {
rule,
previousStartedAt,
params: { testsConfig, includeJobs, excludeJobs },
} = executorOptions;
const config = getResultJobsHealthRuleConfig(testsConfig);
const results: TestsResults = [];
@ -251,7 +266,7 @@ export function jobsHealthServiceProvider(
const jobIds = getJobIds(jobs);
if (jobIds.length === 0) {
logger.warn(`Rule "${ruleInstanceName}" does not have associated jobs.`);
logger.warn(`Rule "${rule.name}" does not have associated jobs.`);
return results;
}
@ -334,6 +349,26 @@ export function jobsHealthServiceProvider(
}
}
if (config.errorMessages.enabled && previousStartedAt) {
const response = await this.getErrorsReport(jobIds, previousStartedAt);
if (response.length > 0) {
results.push({
name: HEALTH_CHECK_NAMES.errorMessages.name,
context: {
results: response,
message: i18n.translate(
'xpack.ml.alertTypes.jobsHealthAlertingRule.errorMessagesMessage',
{
defaultMessage:
'{jobsCount, plural, one {# job contains} other {# jobs contain}} errors in the messages.',
values: { jobsCount: response.length },
}
),
},
});
}
}
return results;
},
};
@ -360,6 +395,7 @@ export function getJobsHealthServiceProvider(getGuards: GetGuards) {
mlClient,
datafeedsProvider(scopedClient, mlClient),
annotationServiceProvider(scopedClient),
jobAuditMessagesProvider(scopedClient, mlClient),
logger
).getTestsResults(...args)
);

View file

@ -22,6 +22,8 @@ import {
AlertInstanceState,
AlertTypeState,
} from '../../../../alerting/common';
import { JobsErrorsResponse } from '../../models/job_audit_messages/job_audit_messages';
import { AlertExecutorOptions } from '../../../../alerting/server';
type ModelSizeStats = MlJobStats['model_size_stats'];
@ -55,7 +57,8 @@ export interface DelayedDataResponse {
export type AnomalyDetectionJobHealthResult =
| MmlTestResponse
| NotStartedDatafeedResponse
| DelayedDataResponse;
| DelayedDataResponse
| JobsErrorsResponse[number];
export type AnomalyDetectionJobsHealthAlertContext = {
results: AnomalyDetectionJobHealthResult[];
@ -69,10 +72,18 @@ export type AnomalyDetectionJobRealtimeIssue = typeof ANOMALY_DETECTION_JOB_REAL
export const REALTIME_ISSUE_DETECTED: ActionGroup<AnomalyDetectionJobRealtimeIssue> = {
id: ANOMALY_DETECTION_JOB_REALTIME_ISSUE,
name: i18n.translate('xpack.ml.jobsHealthAlertingRule.actionGroupName', {
defaultMessage: 'Real-time issue detected',
defaultMessage: 'Issue detected',
}),
};
export type JobsHealthExecutorOptions = AlertExecutorOptions<
AnomalyDetectionJobsHealthRuleParams,
Record<string, unknown>,
Record<string, unknown>,
AnomalyDetectionJobsHealthAlertContext,
AnomalyDetectionJobRealtimeIssue
>;
export function registerJobsMonitoringRuleType({
alerting,
mlServicesProviders,
@ -120,14 +131,16 @@ export function registerJobsMonitoringRuleType({
producer: PLUGIN_ID,
minimumLicenseRequired: MINIMUM_FULL_LICENSE,
isExportable: true,
async executor({ services, params, alertId, state, previousStartedAt, startedAt, name, rule }) {
async executor(options) {
const { services, name } = options;
const fakeRequest = {} as KibanaRequest;
const { getTestsResults } = mlServicesProviders.jobsHealthServiceProvider(
services.savedObjectsClient,
fakeRequest,
logger
);
const executionResult = await getTestsResults(name, params);
const executionResult = await getTestsResults(options);
if (executionResult.length > 0) {
logger.info(

View file

@ -54,6 +54,10 @@ export function isClearable(index?: string): boolean {
return false;
}
export type JobsErrorsResponse = Array<{ job_id: string; errors: JobMessage[] }>;
export type JobAuditMessagesService = ReturnType<typeof jobAuditMessagesProvider>;
export function jobAuditMessagesProvider(
{ asInternalUser }: IScopedClusterClient,
mlClient: MlClient
@ -178,7 +182,10 @@ export function jobAuditMessagesProvider(
return { messages, notificationIndices };
}
// search highest, most recent audit messages for all jobs for the last 24hrs.
/**
* Search highest, most recent audit messages for all jobs for the last 24hrs.
* @param jobIds
*/
async function getAuditMessagesSummary(jobIds: string[]): Promise<AuditMessage[]> {
// TODO This is the current default value of the cluster setting `search.max_buckets`.
// This should possibly consider the real settings in a future update.
@ -400,9 +407,70 @@ export function jobAuditMessagesProvider(
return (Object.keys(LEVEL) as LevelName[])[Object.values(LEVEL).indexOf(level)];
}
/**
* Retrieve list of errors per job.
* @param jobIds
*/
async function getJobsErrors(jobIds: string[], earliestMs?: number): Promise<JobsErrorsResponse> {
const { body } = await asInternalUser.search({
index: ML_NOTIFICATION_INDEX_PATTERN,
ignore_unavailable: true,
size: 0,
body: {
query: {
bool: {
filter: [
...(earliestMs ? [{ range: { timestamp: { gte: earliestMs } } }] : []),
{ terms: { job_id: jobIds } },
{
term: { level: { value: MESSAGE_LEVEL.ERROR } },
},
],
},
},
aggs: {
by_job: {
terms: {
field: 'job_id',
size: jobIds.length,
},
aggs: {
latest_errors: {
top_hits: {
size: 10,
sort: [
{
timestamp: {
order: 'desc',
},
},
],
},
},
},
},
},
},
});
const errors = body.aggregations!.by_job as estypes.AggregationsTermsAggregate<{
key: string;
doc_count: number;
latest_errors: Pick<estypes.SearchResponse<JobMessage>, 'hits'>;
}>;
return errors.buckets.map((bucket) => {
return {
job_id: bucket.key,
errors: bucket.latest_errors.hits.hits.map((v) => v._source!),
};
});
}
return {
getJobAuditMessages,
getAuditMessagesSummary,
clearJobAuditMessages,
getJobsErrors,
};
}