[ML] Improve support for script and aggregation fields in anomaly detection jobs (#81923)

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Quynh Nguyen 2020-11-17 11:34:15 -06:00 committed by GitHub
parent 46d587a19f
commit 55119c2152
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 345 additions and 62 deletions

View file

@ -442,6 +442,16 @@ export const getMessages = once(() => {
url:
'https://www.elastic.co/guide/en/elasticsearch/reference/{{version}}/ml-job-resource.html#ml-job-resource',
},
missing_summary_count_field_name: {
status: VALIDATION_STATUS.ERROR,
text: i18n.translate(
'xpack.ml.models.jobValidation.messages.missingSummaryCountFieldNameMessage',
{
defaultMessage:
'A job configured with a datafeed with aggregations must set summary_count_field_name; use doc_count or suitable alternative.',
}
),
},
skipped_extended_tests: {
status: VALIDATION_STATUS.WARNING,
text: i18n.translate('xpack.ml.models.jobValidation.messages.skippedExtendedTestsMessage', {

View file

@ -19,7 +19,7 @@ export interface Datafeed {
job_id: JobId;
query: object;
query_delay?: string;
script_fields?: object;
script_fields?: Record<string, any>;
scroll_size?: number;
delayed_data_check_config?: object;
indices_options?: IndicesOptions;
@ -30,16 +30,17 @@ export interface ChunkingConfig {
time_span?: string;
}
interface Aggregation {
buckets: {
export type Aggregation = Record<
string,
{
date_histogram: {
field: string;
fixed_interval: string;
};
aggregations?: { [key: string]: any };
aggs?: { [key: string]: any };
};
}
}
>;
interface IndicesOptions {
expand_wildcards?: 'all' | 'open' | 'closed' | 'hidden' | 'none';

View file

@ -89,3 +89,16 @@ export const mlCategory: Field = {
type: ES_FIELD_TYPES.KEYWORD,
aggregatable: false,
};
export interface FieldAggCardinality {
field: string;
percent?: any;
}
export interface ScriptAggCardinality {
script: any;
}
export interface AggCardinality {
cardinality: FieldAggCardinality | ScriptAggCardinality;
}

View file

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Aggregation, Datafeed } from '../types/anomaly_detection_jobs';
export const getDatafeedAggregations = (
datafeedConfig: Partial<Datafeed> | undefined
): Aggregation | undefined => {
if (datafeedConfig?.aggregations !== undefined) return datafeedConfig.aggregations;
if (datafeedConfig?.aggs !== undefined) return datafeedConfig.aggs;
return undefined;
};
export const getAggregationBucketsName = (aggregations: any): string | undefined => {
if (typeof aggregations === 'object') {
const keys = Object.keys(aggregations);
return keys.length > 0 ? keys[0] : undefined;
}
};

View file

@ -188,8 +188,8 @@ describe('ML - job utils', () => {
expect(isTimeSeriesViewDetector(job, 3)).toBe(false);
});
test('returns false for a detector using a script field as a metric field_name', () => {
expect(isTimeSeriesViewDetector(job, 4)).toBe(false);
test('returns true for a detector using a script field as a metric field_name', () => {
expect(isTimeSeriesViewDetector(job, 4)).toBe(true);
});
});
@ -281,6 +281,7 @@ describe('ML - job utils', () => {
expect(isSourceDataChartableForDetector(job, 22)).toBe(true);
expect(isSourceDataChartableForDetector(job, 23)).toBe(true);
expect(isSourceDataChartableForDetector(job, 24)).toBe(true);
expect(isSourceDataChartableForDetector(job, 37)).toBe(true);
});
test('returns false for expected detectors', () => {
@ -296,7 +297,6 @@ describe('ML - job utils', () => {
expect(isSourceDataChartableForDetector(job, 34)).toBe(false);
expect(isSourceDataChartableForDetector(job, 35)).toBe(false);
expect(isSourceDataChartableForDetector(job, 36)).toBe(false);
expect(isSourceDataChartableForDetector(job, 37)).toBe(false);
});
});

View file

@ -20,6 +20,7 @@ import { MlServerLimits } from '../types/ml_server_info';
import { JobValidationMessage, JobValidationMessageId } from '../constants/messages';
import { ES_AGGREGATION, ML_JOB_AGGREGATION } from '../constants/aggregation_types';
import { MLCATEGORY } from '../constants/field_types';
import { getDatafeedAggregations } from './datafeed_utils';
export interface ValidationResults {
valid: boolean;
@ -94,7 +95,6 @@ export function isSourceDataChartableForDetector(job: CombinedJob, detectorIndex
// Perform extra check to see if the detector is using a scripted field.
const scriptFields = Object.keys(job.datafeed_config.script_fields);
isSourceDataChartable =
scriptFields.indexOf(dtr.field_name!) === -1 &&
scriptFields.indexOf(dtr.partition_field_name!) === -1 &&
scriptFields.indexOf(dtr.by_field_name!) === -1 &&
scriptFields.indexOf(dtr.over_field_name!) === -1;
@ -559,6 +559,27 @@ export function basicDatafeedValidation(datafeed: Datafeed): ValidationResults {
};
}
export function basicJobAndDatafeedValidation(job: Job, datafeed: Datafeed): ValidationResults {
const messages: ValidationResults['messages'] = [];
let valid = true;
if (datafeed && job) {
const datafeedAggregations = getDatafeedAggregations(datafeed);
if (datafeedAggregations !== undefined && !job.analysis_config?.summary_count_field_name) {
valid = false;
messages.push({ id: 'missing_summary_count_field_name' });
}
}
return {
messages,
valid,
contains: (id) => messages.some((m) => id === m.id),
find: (id) => messages.find((m) => id === m.id),
};
}
export function validateModelMemoryLimit(job: Job, limits: MlServerLimits): ValidationResults {
const messages: ValidationResults['messages'] = [];
let valid = true;

View file

@ -31,3 +31,22 @@ export function isValidJson(json: string) {
return false;
}
}
export function findAggField(aggs: Record<string, any>, fieldName: string): any {
let value;
Object.keys(aggs).some(function (k) {
if (k === fieldName) {
value = aggs[k];
return true;
}
if (aggs.hasOwnProperty(k) && typeof aggs[k] === 'object') {
value = findAggField(aggs[k], fieldName);
return value !== undefined;
}
});
return value;
}
export function isValidAggregationField(aggs: Record<string, any>, fieldName: string): boolean {
return findAggField(aggs, fieldName) !== undefined;
}

View file

@ -123,7 +123,8 @@ export const anomalyDataChange = function (
config.timeField,
range.min,
range.max,
bucketSpanSeconds * 1000
bucketSpanSeconds * 1000,
config.datafeedConfig
)
.toPromise();
} else {

View file

@ -134,6 +134,7 @@ export const useModelMemoryEstimator = (
// Update model memory estimation payload on the job creator updates
useEffect(() => {
modelMemoryEstimator.update({
datafeedConfig: jobCreator.datafeedConfig,
analysisConfig: jobCreator.jobConfig.analysis_config,
indexPattern: jobCreator.indexPatternTitle,
query: jobCreator.datafeedConfig.query,

View file

@ -10,6 +10,7 @@ import { map, startWith, tap } from 'rxjs/operators';
import {
basicJobValidation,
basicDatafeedValidation,
basicJobAndDatafeedValidation,
} from '../../../../../../common/util/job_utils';
import { getNewJobLimits } from '../../../../services/ml_server_info';
import { JobCreator, JobCreatorType, isCategorizationJobCreator } from '../job_creator';
@ -53,6 +54,7 @@ export interface BasicValidations {
scrollSize: Validation;
categorizerMissingPerPartition: Validation;
categorizerVaryingPerPartitionField: Validation;
summaryCountField: Validation;
}
export interface AdvancedValidations {
@ -80,6 +82,7 @@ export class JobValidator {
scrollSize: { valid: true },
categorizerMissingPerPartition: { valid: true },
categorizerVaryingPerPartitionField: { valid: true },
summaryCountField: { valid: true },
};
private _advancedValidations: AdvancedValidations = {
categorizationFieldValid: { valid: true },
@ -197,6 +200,14 @@ export class JobValidator {
datafeedConfig
);
const basicJobAndDatafeedResults = basicJobAndDatafeedValidation(jobConfig, datafeedConfig);
populateValidationMessages(
basicJobAndDatafeedResults,
this._basicValidations,
jobConfig,
datafeedConfig
);
// run addition job and group id validation
const idResults = checkForExistingJobAndGroupIds(
this._jobCreator.jobId,
@ -228,6 +239,9 @@ export class JobValidator {
public get bucketSpan(): Validation {
return this._basicValidations.bucketSpan;
}
public get summaryCountField(): Validation {
return this._basicValidations.summaryCountField;
}
public get duplicateDetectors(): Validation {
return this._basicValidations.duplicateDetectors;
@ -297,6 +311,7 @@ export class JobValidator {
this.duplicateDetectors.valid &&
this.categorizerMissingPerPartition.valid &&
this.categorizerVaryingPerPartitionField.valid &&
this.summaryCountField.valid &&
!this.validating &&
(this._jobCreator.type !== JOB_TYPE.CATEGORIZATION ||
(this._jobCreator.type === JOB_TYPE.CATEGORIZATION && this.categorizationField))

View file

@ -193,6 +193,15 @@ export function populateValidationMessages(
basicValidations.frequency.valid = false;
basicValidations.frequency.message = invalidTimeIntervalMessage(datafeedConfig.frequency);
}
if (validationResults.contains('missing_summary_count_field_name')) {
basicValidations.summaryCountField.valid = false;
basicValidations.summaryCountField.message = i18n.translate(
'xpack.ml.newJob.wizard.validateJob.summaryCountFieldMissing',
{
defaultMessage: 'Required field as the datafeed uses aggregations.',
}
);
}
}
export function checkForExistingJobAndGroupIds(

View file

@ -61,9 +61,12 @@ export const DatafeedPreview: FC<{
if (combinedJob.datafeed_config && combinedJob.datafeed_config.indices.length) {
try {
const resp = await mlJobService.searchPreview(combinedJob);
const data = resp.aggregations
? resp.aggregations.buckets.buckets.slice(0, ML_DATA_PREVIEW_COUNT)
: resp.hits.hits;
let data = resp.hits.hits;
// the first item under aggregations can be any name
if (typeof resp.aggregations === 'object' && Object.keys(resp.aggregations).length > 0) {
const accessor = Object.keys(resp.aggregations)[0];
data = resp.aggregations[accessor].buckets.slice(0, ML_DATA_PREVIEW_COUNT);
}
setPreviewJsonString(JSON.stringify(data, null, 2));
} catch (error) {

View file

@ -7,23 +7,44 @@
import React, { memo, FC } from 'react';
import { i18n } from '@kbn/i18n';
import { FormattedMessage } from '@kbn/i18n/react';
import { EuiDescribedFormGroup, EuiFormRow } from '@elastic/eui';
import { EuiDescribedFormGroup, EuiFormRow, EuiLink } from '@elastic/eui';
import { Validation } from '../../../../../common/job_validator';
import { useMlKibana } from '../../../../../../../contexts/kibana';
export const Description: FC = memo(({ children }) => {
interface Props {
validation: Validation;
}
export const Description: FC<Props> = memo(({ children, validation }) => {
const title = i18n.translate('xpack.ml.newJob.wizard.pickFieldsStep.summaryCountField.title', {
defaultMessage: 'Summary count field',
});
const {
services: { docLinks },
} = useMlKibana();
const { ELASTIC_WEBSITE_URL, DOC_LINK_VERSION } = docLinks;
const docsUrl = `${ELASTIC_WEBSITE_URL}guide/en/machine-learning/${DOC_LINK_VERSION}/ml-configuring-aggregation.html`;
return (
<EuiDescribedFormGroup
title={<h3>{title}</h3>}
description={
<FormattedMessage
id="xpack.ml.newJob.wizard.pickFieldsStep.summaryCountField.description"
defaultMessage="Optional, for use if input data has been pre-summarized e.g. \{docCountParam\}."
defaultMessage="If the input data is {aggregated}, specify the field that contains the document count."
values={{
aggregated: (
<EuiLink href={docsUrl} target="_blank">
<FormattedMessage
id="xpack.ml.newJob.wizard.pickFieldsStep.summaryCountField.aggregatedText"
defaultMessage="aggregated"
/>
</EuiLink>
),
}}
/>
}
>
<EuiFormRow label={title}>
<EuiFormRow label={title} error={validation.message} isInvalid={validation.valid === false}>
<>{children}</>
</EuiFormRow>
</EuiDescribedFormGroup>

View file

@ -17,13 +17,23 @@ import {
import { Description } from './description';
export const SummaryCountField: FC = () => {
const { jobCreator: jc, jobCreatorUpdate, jobCreatorUpdated } = useContext(JobCreatorContext);
const {
jobCreator: jc,
jobCreatorUpdate,
jobCreatorUpdated,
jobValidator,
jobValidatorUpdated,
} = useContext(JobCreatorContext);
const jobCreator = jc as MultiMetricJobCreator | PopulationJobCreator | AdvancedJobCreator;
const { fields } = newJobCapsService;
const [summaryCountFieldName, setSummaryCountFieldName] = useState(
jobCreator.summaryCountFieldName
);
const [validation, setValidation] = useState(jobValidator.summaryCountField);
useEffect(() => {
setValidation(jobValidator.summaryCountField);
}, [jobValidatorUpdated]);
useEffect(() => {
jobCreator.summaryCountFieldName = summaryCountFieldName;
@ -35,7 +45,7 @@ export const SummaryCountField: FC = () => {
}, [jobCreatorUpdated]);
return (
<Description>
<Description validation={validation}>
<SummaryCountFieldSelect
fields={fields}
changeHandler={setSummaryCountFieldName}

View file

@ -628,6 +628,7 @@ export function mlApiServicesProvider(httpService: HttpService) {
},
calculateModelMemoryLimit$({
datafeedConfig,
analysisConfig,
indexPattern,
query,
@ -635,6 +636,7 @@ export function mlApiServicesProvider(httpService: HttpService) {
earliestMs,
latestMs,
}: {
datafeedConfig?: Datafeed;
analysisConfig: AnalysisConfig;
indexPattern: string;
query: any;
@ -643,6 +645,7 @@ export function mlApiServicesProvider(httpService: HttpService) {
latestMs: number;
}) {
const body = JSON.stringify({
datafeedConfig,
analysisConfig,
indexPattern,
query,

View file

@ -16,9 +16,11 @@ import { map } from 'rxjs/operators';
import { each, get } from 'lodash';
import { Dictionary } from '../../../../common/types/common';
import { ML_MEDIAN_PERCENTS } from '../../../../common/util/job_utils';
import { JobId } from '../../../../common/types/anomaly_detection_jobs';
import { Datafeed, JobId } from '../../../../common/types/anomaly_detection_jobs';
import { MlApiServices } from '../ml_api_service';
import { CriteriaField } from './index';
import { findAggField } from '../../../../common/util/validation_utils';
import { getDatafeedAggregations } from '../../../../common/util/datafeed_utils';
import { aggregationTypeTransform } from '../../../../common/util/anomaly_utils';
interface ResultResponse {
@ -69,8 +71,12 @@ export function resultsServiceRxProvider(mlApiServices: MlApiServices) {
timeFieldName: string,
earliestMs: number,
latestMs: number,
intervalMs: number
intervalMs: number,
datafeedConfig?: Datafeed
): Observable<MetricData> {
const scriptFields = datafeedConfig?.script_fields;
const aggFields = getDatafeedAggregations(datafeedConfig);
// Build the criteria to use in the bool filter part of the request.
// Add criteria for the time range, entity fields,
// plus any additional supplied query.
@ -151,15 +157,35 @@ export function resultsServiceRxProvider(mlApiServices: MlApiServices) {
body.aggs.byTime.aggs = {};
const metricAgg: any = {
[metricFunction]: {
field: metricFieldName,
},
[metricFunction]: {},
};
if (scriptFields !== undefined && scriptFields[metricFieldName] !== undefined) {
metricAgg[metricFunction].script = scriptFields[metricFieldName].script;
} else {
metricAgg[metricFunction].field = metricFieldName;
}
if (metricFunction === 'percentiles') {
metricAgg[metricFunction].percents = [ML_MEDIAN_PERCENTS];
}
body.aggs.byTime.aggs.metric = metricAgg;
// when the field is an aggregation field, because the field doesn't actually exist in the indices
// we need to pass all the sub aggs from the original datafeed config
// so that we can access the aggregated field
if (typeof aggFields === 'object' && Object.keys(aggFields).length > 0) {
// first item under aggregations can be any name, not necessarily 'buckets'
const accessor = Object.keys(aggFields)[0];
const tempAggs = { ...(aggFields[accessor].aggs ?? aggFields[accessor].aggregations) };
const foundValue = findAggField(tempAggs, metricFieldName);
if (foundValue !== undefined) {
tempAggs.metric = foundValue;
delete tempAggs[metricFieldName];
}
body.aggs.byTime.aggs = tempAggs;
} else {
body.aggs.byTime.aggs.metric = metricAgg;
}
}
return mlApiServices.esSearch$({ index, body }).pipe(

View file

@ -286,7 +286,7 @@ export function resultsServiceProvider(mlApiServices) {
influencerFieldValues: {
terms: {
field: 'influencer_field_value',
size: maxFieldValues,
size: !!maxFieldValues ? maxFieldValues : ANOMALY_SWIM_LANE_HARD_LIMIT,
order: {
maxAnomalyScore: 'desc',
},
@ -416,7 +416,7 @@ export function resultsServiceProvider(mlApiServices) {
influencerFieldValues: {
terms: {
field: 'influencer_field_value',
size: maxResults !== undefined ? maxResults : 2,
size: !!maxResults ? maxResults : 2,
order: {
maxAnomalyScore: 'desc',
},

View file

@ -94,7 +94,8 @@ function getMetricData(
chartConfig.timeField,
earliestMs,
latestMs,
intervalMs
intervalMs,
chartConfig?.datafeedConfig
)
.pipe(
map((resp) => {

View file

@ -7,7 +7,7 @@
import numeral from '@elastic/numeral';
import { IScopedClusterClient } from 'kibana/server';
import { MLCATEGORY } from '../../../common/constants/field_types';
import { AnalysisConfig } from '../../../common/types/anomaly_detection_jobs';
import { AnalysisConfig, Datafeed } from '../../../common/types/anomaly_detection_jobs';
import { fieldsServiceProvider } from '../fields_service';
import { MlInfoResponse } from '../../../common/types/ml_server_info';
import type { MlClient } from '../../lib/ml_client';
@ -46,7 +46,8 @@ const cardinalityCheckProvider = (client: IScopedClusterClient) => {
query: any,
timeFieldName: string,
earliestMs: number,
latestMs: number
latestMs: number,
datafeedConfig?: Datafeed
): Promise<{
overallCardinality: { [key: string]: number };
maxBucketCardinality: { [key: string]: number };
@ -101,7 +102,8 @@ const cardinalityCheckProvider = (client: IScopedClusterClient) => {
query,
timeFieldName,
earliestMs,
latestMs
latestMs,
datafeedConfig
);
}
@ -142,7 +144,8 @@ export function calculateModelMemoryLimitProvider(
timeFieldName: string,
earliestMs: number,
latestMs: number,
allowMMLGreaterThanMax = false
allowMMLGreaterThanMax = false,
datafeedConfig?: Datafeed
): Promise<ModelMemoryEstimationResult> {
const { body: info } = await mlClient.info<MlInfoResponse>();
const maxModelMemoryLimit = info.limits.max_model_memory_limit?.toUpperCase();
@ -154,7 +157,8 @@ export function calculateModelMemoryLimitProvider(
query,
timeFieldName,
earliestMs,
latestMs
latestMs,
datafeedConfig
);
const { body } = await mlClient.estimateModelMemory<ModelMemoryEstimateResponse>({

View file

@ -15,6 +15,9 @@ import {
buildSamplerAggregation,
getSamplerAggregationsResponsePath,
} from '../../lib/query_utils';
import { AggCardinality } from '../../../common/types/fields';
import { getDatafeedAggregations } from '../../../common/util/datafeed_utils';
import { Datafeed } from '../../../common/types/anomaly_detection_jobs';
const SAMPLER_TOP_TERMS_THRESHOLD = 100000;
const SAMPLER_TOP_TERMS_SHARD_SIZE = 5000;
@ -121,12 +124,6 @@ interface AggHistogram {
};
}
interface AggCardinality {
cardinality: {
field: string;
};
}
interface AggTerms {
terms: {
field: string;
@ -597,23 +594,35 @@ export class DataVisualizer {
samplerShardSize: number,
timeFieldName: string,
earliestMs?: number,
latestMs?: number
latestMs?: number,
datafeedConfig?: Datafeed
) {
const index = indexPatternTitle;
const size = 0;
const filterCriteria = buildBaseFilterCriteria(timeFieldName, earliestMs, latestMs, query);
const datafeedAggregations = getDatafeedAggregations(datafeedConfig);
// Value count aggregation faster way of checking if field exists than using
// filter aggregation with exists query.
const aggs: Aggs = {};
const aggs: Aggs = datafeedAggregations !== undefined ? { ...datafeedAggregations } : {};
aggregatableFields.forEach((field, i) => {
const safeFieldName = getSafeAggregationName(field, i);
aggs[`${safeFieldName}_count`] = {
filter: { exists: { field } },
};
aggs[`${safeFieldName}_cardinality`] = {
cardinality: { field },
};
let cardinalityField: AggCardinality;
if (datafeedConfig?.script_fields?.hasOwnProperty(field)) {
cardinalityField = aggs[`${safeFieldName}_cardinality`] = {
cardinality: { script: datafeedConfig?.script_fields[field].script },
};
} else {
cardinalityField = {
cardinality: { field },
};
}
aggs[`${safeFieldName}_cardinality`] = cardinalityField;
});
const searchBody = {
@ -661,10 +670,27 @@ export class DataVisualizer {
},
});
} else {
stats.aggregatableNotExistsFields.push({
fieldName: field,
existsInDocs: false,
});
if (datafeedConfig?.script_fields?.hasOwnProperty(field)) {
const cardinality = get(
aggregations,
[...aggsPath, `${safeFieldName}_cardinality`, 'value'],
0
);
stats.aggregatableExistsFields.push({
fieldName: field,
existsInDocs: true,
stats: {
sampleCount,
count,
cardinality,
},
});
} else {
stats.aggregatableNotExistsFields.push({
fieldName: field,
existsInDocs: false,
});
}
}
});

View file

@ -9,6 +9,10 @@ import { IScopedClusterClient } from 'kibana/server';
import { duration } from 'moment';
import { parseInterval } from '../../../common/util/parse_interval';
import { initCardinalityFieldsCache } from './fields_aggs_cache';
import { AggCardinality } from '../../../common/types/fields';
import { isValidAggregationField } from '../../../common/util/validation_utils';
import { getDatafeedAggregations } from '../../../common/util/datafeed_utils';
import { Datafeed } from '../../../common/types/anomaly_detection_jobs';
/**
* Service for carrying out queries to obtain data
@ -35,14 +39,29 @@ export function fieldsServiceProvider({ asCurrentUser }: IScopedClusterClient) {
*/
async function getAggregatableFields(
index: string | string[],
fieldNames: string[]
fieldNames: string[],
datafeedConfig?: Datafeed
): Promise<string[]> {
const { body } = await asCurrentUser.fieldCaps({
index,
fields: fieldNames,
});
const aggregatableFields: string[] = [];
const datafeedAggregations = getDatafeedAggregations(datafeedConfig);
fieldNames.forEach((fieldName) => {
if (
typeof datafeedConfig?.script_fields === 'object' &&
datafeedConfig.script_fields.hasOwnProperty(fieldName)
) {
aggregatableFields.push(fieldName);
}
if (
datafeedAggregations !== undefined &&
isValidAggregationField(datafeedAggregations, fieldName)
) {
aggregatableFields.push(fieldName);
}
const fieldInfo = body.fields[fieldName];
const typeKeys = fieldInfo !== undefined ? Object.keys(fieldInfo) : [];
if (typeKeys.length > 0) {
@ -67,10 +86,12 @@ export function fieldsServiceProvider({ asCurrentUser }: IScopedClusterClient) {
query: any,
timeFieldName: string,
earliestMs: number,
latestMs: number
latestMs: number,
datafeedConfig?: Datafeed
): Promise<{ [key: string]: number }> {
const aggregatableFields = await getAggregatableFields(index, fieldNames);
const aggregatableFields = await getAggregatableFields(index, fieldNames, datafeedConfig);
// getAggregatableFields doesn't account for scripted or aggregated fields
if (aggregatableFields.length === 0) {
return {};
}
@ -112,10 +133,22 @@ export function fieldsServiceProvider({ asCurrentUser }: IScopedClusterClient) {
mustCriteria.push(query);
}
const aggs = fieldsToAgg.reduce((obj, field) => {
obj[field] = { cardinality: { field } };
return obj;
}, {} as { [field: string]: { cardinality: { field: string } } });
const aggs = fieldsToAgg.reduce(
(obj, field) => {
if (
typeof datafeedConfig?.script_fields === 'object' &&
datafeedConfig.script_fields.hasOwnProperty(field)
) {
obj[field] = { cardinality: { script: datafeedConfig.script_fields[field].script } };
} else {
obj[field] = { cardinality: { field } };
}
return obj;
},
{} as {
[field: string]: AggCardinality;
}
);
const body = {
query: {

View file

@ -27,6 +27,7 @@ import { validateTimeRange, isValidTimeField } from './validate_time_range';
import { validateJobSchema } from '../../routes/schemas/job_validation_schema';
import { CombinedJob } from '../../../common/types/anomaly_detection_jobs';
import type { MlClient } from '../../lib/ml_client';
import { getDatafeedAggregations } from '../../../common/util/datafeed_utils';
export type ValidateJobPayload = TypeOf<typeof validateJobSchema>;
@ -100,6 +101,12 @@ export async function validateJob(
...(await validateModelMemoryLimit(client, mlClient, job, duration))
);
}
// if datafeed has aggregation, require job config to include a valid summary_doc_field_name
const datafeedAggregations = getDatafeedAggregations(job.datafeed_config);
if (datafeedAggregations !== undefined && !job.analysis_config?.summary_count_field_name) {
validationMessages.push({ id: 'missing_summary_count_field_name' });
}
} else {
validationMessages = basicValidation.messages;
validationMessages.push({ id: 'skipped_extended_tests' });

View file

@ -11,6 +11,8 @@ import { validateJobObject } from './validate_job_object';
import { CombinedJob } from '../../../common/types/anomaly_detection_jobs';
import { Detector } from '../../../common/types/anomaly_detection_jobs';
import { MessageId, JobValidationMessage } from '../../../common/constants/messages';
import { isValidAggregationField } from '../../../common/util/validation_utils';
import { getDatafeedAggregations } from '../../../common/util/datafeed_utils';
function isValidCategorizationConfig(job: CombinedJob, fieldName: string): boolean {
return (
@ -66,6 +68,7 @@ const validateFactory = (client: IScopedClusterClient, job: CombinedJob): Valida
const relevantDetectors = detectors.filter((detector) => {
return typeof detector[fieldName] !== 'undefined';
});
const datafeedConfig = job.datafeed_config;
if (relevantDetectors.length > 0) {
try {
@ -78,11 +81,26 @@ const validateFactory = (client: IScopedClusterClient, job: CombinedJob): Valida
index: job.datafeed_config.indices.join(','),
fields: uniqueFieldNames,
});
const datafeedAggregations = getDatafeedAggregations(datafeedConfig);
let aggregatableFieldNames: string[] = [];
// parse fieldCaps to return an array of just the fields which are aggregatable
if (typeof fieldCaps === 'object' && typeof fieldCaps.fields === 'object') {
aggregatableFieldNames = uniqueFieldNames.filter((field) => {
if (
typeof datafeedConfig?.script_fields === 'object' &&
datafeedConfig?.script_fields.hasOwnProperty(field)
) {
return true;
}
// if datafeed has aggregation fields, check recursively if field exist
if (
datafeedAggregations !== undefined &&
isValidAggregationField(datafeedAggregations, field)
) {
return true;
}
if (typeof fieldCaps.fields[field] !== 'undefined') {
const fieldType = Object.keys(fieldCaps.fields[field])[0];
return fieldCaps.fields[field][fieldType].aggregatable;
@ -96,7 +114,10 @@ const validateFactory = (client: IScopedClusterClient, job: CombinedJob): Valida
job.datafeed_config.query,
aggregatableFieldNames,
0,
job.data_description.time_field
job.data_description.time_field,
undefined,
undefined,
datafeedConfig
);
uniqueFieldNames.forEach((uniqueFieldName) => {

View file

@ -65,7 +65,8 @@ export async function validateModelMemoryLimit(
job.data_description.time_field,
duration!.start as number,
duration!.end as number,
true
true,
job.datafeed_config
);
// @ts-expect-error
const mmlEstimateBytes: number = numeral(modelMemoryLimit).value();

View file

@ -7,7 +7,7 @@
import Boom from '@hapi/boom';
import { IScopedClusterClient } from 'kibana/server';
import { TypeOf } from '@kbn/config-schema';
import { AnalysisConfig } from '../../common/types/anomaly_detection_jobs';
import { AnalysisConfig, Datafeed } from '../../common/types/anomaly_detection_jobs';
import { wrapError } from '../client/error_wrapper';
import { RouteInitialization } from '../types';
import {
@ -35,7 +35,15 @@ export function jobValidationRoutes(
mlClient: MlClient,
payload: CalculateModelMemoryLimitPayload
) {
const { analysisConfig, indexPattern, query, timeFieldName, earliestMs, latestMs } = payload;
const {
datafeedConfig,
analysisConfig,
indexPattern,
query,
timeFieldName,
earliestMs,
latestMs,
} = payload;
return calculateModelMemoryLimitProvider(client, mlClient)(
analysisConfig as AnalysisConfig,
@ -43,7 +51,9 @@ export function jobValidationRoutes(
query,
timeFieldName,
earliestMs,
latestMs
latestMs,
undefined,
datafeedConfig as Datafeed
);
}

View file

@ -20,6 +20,7 @@ export const estimateBucketSpanSchema = schema.object({
});
export const modelMemoryLimitSchema = schema.object({
datafeedConfig: datafeedConfigSchema,
analysisConfig: analysisConfigSchema,
indexPattern: schema.string(),
query: schema.any(),

View file

@ -13188,7 +13188,6 @@
"xpack.ml.newJob.wizard.pickFieldsStep.stoppedPartitionsErrorCallout": "停止したパーティションのリストの取得中にエラーが発生しました。",
"xpack.ml.newJob.wizard.pickFieldsStep.stoppedPartitionsExistCallout": "パーティション単位の分類とstop_on_warn設定が有効です。ジョブ「{jobId}」の一部のパーティションは分類に適さず、さらなる分類または異常検知分析から除外されました。",
"xpack.ml.newJob.wizard.pickFieldsStep.stoppedPartitionsPreviewColumnName": "停止したパーティション名",
"xpack.ml.newJob.wizard.pickFieldsStep.summaryCountField.description": "オプション。インプットデータが事前にまとめられている場合に使用、例: \\{docCountParam\\}。",
"xpack.ml.newJob.wizard.pickFieldsStep.summaryCountField.title": "サマリーカウントフィールド",
"xpack.ml.newJob.wizard.previewJsonButton": "JSON をプレビュー",
"xpack.ml.newJob.wizard.previousStepButton": "前へ",

View file

@ -13202,7 +13202,6 @@
"xpack.ml.newJob.wizard.pickFieldsStep.stoppedPartitionsErrorCallout": "提取已停止分区的列表时发生错误。",
"xpack.ml.newJob.wizard.pickFieldsStep.stoppedPartitionsExistCallout": "启用按分区分类和 stop_on_warn 设置。作业“{jobId}”中的某些分区不适合进行分类,已从进一步分类或异常检测分析中排除。",
"xpack.ml.newJob.wizard.pickFieldsStep.stoppedPartitionsPreviewColumnName": "已停止的分区名称",
"xpack.ml.newJob.wizard.pickFieldsStep.summaryCountField.description": "可选,用于输入数据已预汇总时,例如 \\{docCountParam\\}。",
"xpack.ml.newJob.wizard.pickFieldsStep.summaryCountField.title": "汇总计数字段",
"xpack.ml.newJob.wizard.previewJsonButton": "预览 JSON",
"xpack.ml.newJob.wizard.previousStepButton": "上一页",

View file

@ -303,6 +303,12 @@ export default ({ getService }: FtrProviderContext) => {
url: `https://www.elastic.co/guide/en/machine-learning/${pkg.branch}/create-jobs.html#model-memory-limits`,
status: 'warning',
},
{
id: 'missing_summary_count_field_name',
status: 'error',
text:
'A job configured with a datafeed with aggregations must set summary_count_field_name; use doc_count or suitable alternative.',
},
];
expect(body.length).to.eql(