[ML] Update cloning for jobs to use exclude_generated (#88898)

This commit is contained in:
Quynh Nguyen 2021-02-01 13:06:37 -06:00 committed by GitHub
parent 51cfa90dc5
commit 2f54078aed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 206 additions and 104 deletions

View file

@ -4,7 +4,6 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { cloneDeep } from 'lodash';
import { Datafeed } from './datafeed';
import { DatafeedStats } from './datafeed_stats';
import { Job } from './job';
@ -25,16 +24,6 @@ export interface CombinedJobWithStats extends JobWithStats {
datafeed_config: DatafeedWithStats;
}
export function expandCombinedJobConfig(combinedJob: CombinedJob) {
const combinedJobClone = cloneDeep(combinedJob);
const job = combinedJobClone;
const datafeed = combinedJobClone.datafeed_config;
// @ts-expect-error
delete job.datafeed_config;
return { job, datafeed };
}
export function isCombinedJobWithStats(arg: any): arg is CombinedJobWithStats {
return typeof arg.job_id === 'string';
}

View file

@ -62,7 +62,7 @@ export const Page: FC<Props> = ({ jobId }) => {
if (currentIndexPattern) {
(async function () {
if (jobId !== undefined) {
const analyticsConfigs = await ml.dataFrameAnalytics.getDataFrameAnalytics(jobId);
const analyticsConfigs = await ml.dataFrameAnalytics.getDataFrameAnalytics(jobId, true);
if (
Array.isArray(analyticsConfigs.data_frame_analytics) &&
analyticsConfigs.data_frame_analytics.length > 0

View file

@ -310,9 +310,6 @@ export type CloneDataFrameAnalyticsConfig = Omit<
*/
export function extractCloningConfig({
id,
version,
// eslint-disable-next-line @typescript-eslint/naming-convention
create_time,
...configToClone
}: DeepReadonly<DataFrameAnalyticsConfig>): CloneDataFrameAnalyticsConfig {
return (cloneDeep({

View file

@ -36,6 +36,23 @@ export function loadFullJob(jobId) {
});
}
export function loadJobForCloning(jobId) {
return new Promise((resolve, reject) => {
ml.jobs
.jobForCloning(jobId)
.then((resp) => {
if (resp) {
resolve(resp);
} else {
throw new Error(`Could not find job ${jobId}`);
}
})
.catch((error) => {
reject(error);
});
});
}
export function isStartable(jobs) {
return jobs.some(
(j) => j.datafeedState === DATAFEED_STATE.STOPPED && j.jobState !== JOB_STATE.CLOSING
@ -180,31 +197,38 @@ function showResults(resp, action) {
export async function cloneJob(jobId) {
try {
const job = await loadFullJob(jobId);
if (job.custom_settings && job.custom_settings.created_by) {
const [{ job: cloneableJob, datafeed }, originalJob] = await Promise.all([
loadJobForCloning(jobId),
loadFullJob(jobId, false),
]);
if (cloneableJob !== undefined && originalJob?.custom_settings?.created_by !== undefined) {
// if the job is from a wizards, i.e. contains a created_by property
// use tempJobCloningObjects to temporarily store the job
mlJobService.tempJobCloningObjects.job = job;
mlJobService.tempJobCloningObjects.createdBy = originalJob?.custom_settings?.created_by;
mlJobService.tempJobCloningObjects.job = cloneableJob;
if (
job.data_counts.earliest_record_timestamp !== undefined &&
job.data_counts.latest_record_timestamp !== undefined &&
job.data_counts.latest_bucket_timestamp !== undefined
originalJob.data_counts.earliest_record_timestamp !== undefined &&
originalJob.data_counts.latest_record_timestamp !== undefined &&
originalJob.data_counts.latest_bucket_timestamp !== undefined
) {
// if the job has run before, use the earliest and latest record timestamp
// as the cloned job's time range
let start = job.data_counts.earliest_record_timestamp;
let end = job.data_counts.latest_record_timestamp;
let start = originalJob.data_counts.earliest_record_timestamp;
let end = originalJob.data_counts.latest_record_timestamp;
if (job.datafeed_config.aggregations !== undefined) {
if (originalJob.datafeed_config.aggregations !== undefined) {
// if the datafeed uses aggregations the earliest and latest record timestamps may not be the same
// as the start and end of the data in the index.
const bucketSpanMs = parseInterval(job.analysis_config.bucket_span).asMilliseconds();
const bucketSpanMs = parseInterval(
originalJob.analysis_config.bucket_span
).asMilliseconds();
// round down to the start of the nearest bucket
start =
Math.floor(job.data_counts.earliest_record_timestamp / bucketSpanMs) * bucketSpanMs;
Math.floor(originalJob.data_counts.earliest_record_timestamp / bucketSpanMs) *
bucketSpanMs;
// use latest_bucket_timestamp and add two bucket spans minus one ms
end = job.data_counts.latest_bucket_timestamp + bucketSpanMs * 2 - 1;
end = originalJob.data_counts.latest_bucket_timestamp + bucketSpanMs * 2 - 1;
}
mlJobService.tempJobCloningObjects.start = start;
@ -212,12 +236,17 @@ export async function cloneJob(jobId) {
}
} else {
// otherwise use the tempJobCloningObjects
mlJobService.tempJobCloningObjects.job = job;
mlJobService.tempJobCloningObjects.job = cloneableJob;
// resets the createdBy field in case it still retains previous settings
mlJobService.tempJobCloningObjects.createdBy = undefined;
}
if (datafeed !== undefined) {
mlJobService.tempJobCloningObjects.datafeed = datafeed;
}
if (job.calendars) {
if (originalJob.calendars) {
mlJobService.tempJobCloningObjects.calendars = await mlCalendarService.fetchCalendarsByIds(
job.calendars
originalJob.calendars
);
}

View file

@ -8,7 +8,7 @@ import { ApplicationStart } from 'kibana/public';
import { IndexPatternsContract } from '../../../../../../../../../src/plugins/data/public';
import { mlJobService } from '../../../../services/job_service';
import { loadIndexPatterns, getIndexPatternIdFromName } from '../../../../util/index_utils';
import { CombinedJob } from '../../../../../../common/types/anomaly_detection_jobs';
import { Datafeed, Job } from '../../../../../../common/types/anomaly_detection_jobs';
import { CREATED_BY_LABEL, JOB_TYPE } from '../../../../../../common/constants/new_job';
export async function preConfiguredJobRedirect(
@ -16,11 +16,11 @@ export async function preConfiguredJobRedirect(
basePath: string,
navigateToUrl: ApplicationStart['navigateToUrl']
) {
const { job } = mlJobService.tempJobCloningObjects;
if (job) {
const { createdBy, job, datafeed } = mlJobService.tempJobCloningObjects;
if (job && datafeed) {
try {
await loadIndexPatterns(indexPatterns);
const redirectUrl = getWizardUrlFromCloningJob(job);
const redirectUrl = getWizardUrlFromCloningJob(createdBy, job, datafeed);
await navigateToUrl(`${basePath}/app/ml/${redirectUrl}`);
return Promise.reject();
} catch (error) {
@ -33,8 +33,8 @@ export async function preConfiguredJobRedirect(
}
}
function getWizardUrlFromCloningJob(job: CombinedJob) {
const created = job?.custom_settings?.created_by;
function getWizardUrlFromCloningJob(createdBy: string | undefined, job: Job, datafeed: Datafeed) {
const created = createdBy;
let page = '';
switch (created) {
@ -55,7 +55,7 @@ function getWizardUrlFromCloningJob(job: CombinedJob) {
break;
}
const indexPatternId = getIndexPatternIdFromName(job.datafeed_config.indices.join());
const indexPatternId = getIndexPatternIdFromName(datafeed.indices.join());
return `jobs/new_job/${page}?index=${indexPatternId}&_g=()`;
}

View file

@ -37,7 +37,6 @@ import { useMlContext } from '../../../../contexts/ml';
import { getTimeFilterRange } from '../../../../components/full_time_range_selector';
import { getTimeBucketsFromCache } from '../../../../util/time_buckets';
import { ExistingJobsAndGroups, mlJobService } from '../../../../services/job_service';
import { expandCombinedJobConfig } from '../../../../../../common/types/anomaly_detection_jobs';
import { newJobCapsService } from '../../../../services/new_job_capabilities_service';
import { EVENT_RATE_FIELD_ID } from '../../../../../../common/types/fields';
import { getNewJobDefaults } from '../../../../services/ml_server_info';
@ -74,10 +73,11 @@ export const Page: FC<PageProps> = ({ existingJobsAndGroups, jobType }) => {
if (mlJobService.tempJobCloningObjects.job !== undefined) {
// cloning a job
const clonedJob = mlJobService.cloneJob(mlJobService.tempJobCloningObjects.job);
const { job, datafeed } = expandCombinedJobConfig(clonedJob);
const clonedJob = mlJobService.tempJobCloningObjects.job;
const clonedDatafeed = mlJobService.cloneDatafeed(mlJobService.tempJobCloningObjects.datafeed);
initCategorizationSettings();
jobCreator.cloneFromExistingJob(job, datafeed);
jobCreator.cloneFromExistingJob(clonedJob, clonedDatafeed);
// if we're not skipping the time range, this is a standard job clone, so wipe the jobId
if (mlJobService.tempJobCloningObjects.skipTimeRangeStep === false) {

View file

@ -6,7 +6,7 @@
import { SearchResponse } from 'elasticsearch';
import { TimeRange } from 'src/plugins/data/common/query/timefilter/types';
import { CombinedJob } from '../../../common/types/anomaly_detection_jobs';
import { CombinedJob, Datafeed } from '../../../common/types/anomaly_detection_jobs';
import { Calendar } from '../../../common/types/calendars';
export interface ExistingJobsAndGroups {
@ -18,6 +18,8 @@ declare interface JobService {
jobs: CombinedJob[];
createResultsUrlForJobs: (jobs: any[], target: string, timeRange?: TimeRange) => string;
tempJobCloningObjects: {
createdBy?: string;
datafeed?: Datafeed;
job: any;
skipTimeRangeStep: boolean;
start?: number;
@ -26,7 +28,7 @@ declare interface JobService {
};
skipTimeRangeStep: boolean;
saveNewJob(job: any): Promise<any>;
cloneJob(job: any): any;
cloneDatafeed(datafeed: any): Datafeed;
openJob(jobId: string): Promise<any>;
saveNewDatafeed(datafeedConfig: any, jobId: string): Promise<any>;
startDatafeed(

View file

@ -28,6 +28,8 @@ class JobService {
// if populated when loading the job management page, the start datafeed modal
// is automatically opened.
this.tempJobCloningObjects = {
createdBy: undefined,
datafeed: undefined,
job: undefined,
skipTimeRangeStep: false,
start: undefined,
@ -325,67 +327,15 @@ class JobService {
return ml.addJob({ jobId: job.job_id, job }).then(func).catch(func);
}
cloneJob(job) {
// create a deep copy of a job object
// also remove items from the job which are set by the server and not needed
// in the future this formatting could be optional
const tempJob = cloneDeep(job);
// remove all of the items which should not be copied
// such as counts, state and times
delete tempJob.state;
delete tempJob.job_version;
delete tempJob.data_counts;
delete tempJob.create_time;
delete tempJob.finished_time;
delete tempJob.last_data_time;
delete tempJob.model_size_stats;
delete tempJob.node;
delete tempJob.average_bucket_processing_time_ms;
delete tempJob.model_snapshot_id;
delete tempJob.open_time;
delete tempJob.established_model_memory;
delete tempJob.calendars;
delete tempJob.timing_stats;
delete tempJob.forecasts_stats;
delete tempJob.assignment_explanation;
delete tempJob.analysis_config.use_per_partition_normalization;
each(tempJob.analysis_config.detectors, (d) => {
delete d.detector_index;
});
cloneDatafeed(datafeed) {
const tempDatafeed = cloneDeep(datafeed);
// remove parts of the datafeed config which should not be copied
if (tempJob.datafeed_config) {
delete tempJob.datafeed_config.datafeed_id;
delete tempJob.datafeed_config.job_id;
delete tempJob.datafeed_config.state;
delete tempJob.datafeed_config.node;
delete tempJob.datafeed_config.timing_stats;
delete tempJob.datafeed_config.assignment_explanation;
// remove query_delay if it's between 60s and 120s
// the back-end produces a random value between 60 and 120 and so
// by deleting it, the back-end will produce a new random value
if (tempJob.datafeed_config.query_delay) {
const interval = parseInterval(tempJob.datafeed_config.query_delay);
if (interval !== null) {
const queryDelay = interval.asSeconds();
if (queryDelay > 60 && queryDelay < 120) {
delete tempJob.datafeed_config.query_delay;
}
}
}
if (tempDatafeed) {
delete tempDatafeed.datafeed_id;
delete tempDatafeed.job_id;
}
// when jumping from a wizard to the advanced job creation,
// the wizard's created_by information should be stripped.
if (tempJob.custom_settings && tempJob.custom_settings.created_by) {
delete tempJob.custom_settings.created_by;
}
return tempJob;
return tempDatafeed;
}
// find a job based on the id

View file

@ -52,11 +52,12 @@ interface JobsExistsResponse {
}
export const dataFrameAnalytics = {
getDataFrameAnalytics(analyticsId?: string) {
getDataFrameAnalytics(analyticsId?: string, excludeGenerated?: boolean) {
const analyticsIdString = analyticsId !== undefined ? `/${analyticsId}` : '';
return http<GetDataFrameAnalyticsResponse>({
path: `${basePath()}/data_frame/analytics${analyticsIdString}`,
method: 'GET',
...(excludeGenerated ? { query: { excludeGenerated } } : {}),
});
},
getDataFrameAnalyticsStats(analyticsId?: string) {

View file

@ -13,6 +13,8 @@ import {
MlJobWithTimeRange,
MlSummaryJobs,
CombinedJobWithStats,
Job,
Datafeed,
} from '../../../../common/types/anomaly_detection_jobs';
import { JobMessage } from '../../../../common/types/audit_message';
import { AggFieldNamePair } from '../../../../common/types/fields';
@ -48,6 +50,15 @@ export const jobsApiProvider = (httpService: HttpService) => ({
});
},
jobForCloning(jobId: string) {
const body = JSON.stringify({ jobId });
return httpService.http<{ job?: Job; datafeed?: Datafeed } | undefined>({
path: `${basePath()}/jobs/job_for_cloning`,
method: 'POST',
body,
});
},
jobs(jobIds: string[]) {
const body = JSON.stringify({ jobIds });
return httpService.http<CombinedJobWithStats[]>({

View file

@ -160,11 +160,55 @@ export function datafeedsProvider(mlClient: MlClient) {
}, {} as { [id: string]: string });
}
async function getDatafeedByJobId(
jobId: string,
excludeGenerated?: boolean
): Promise<Datafeed | undefined> {
async function findDatafeed() {
// if the job was doesn't use the standard datafeedId format
// get all the datafeeds and match it with the jobId
const {
body: { datafeeds },
} = await mlClient.getDatafeeds<MlDatafeedsResponse>(
excludeGenerated ? { exclude_generated: true } : {}
);
for (const result of datafeeds) {
if (result.job_id === jobId) {
return result;
}
}
}
// if the job was created by the wizard,
// then we can assume it uses the standard format of the datafeedId
const assumedDefaultDatafeedId = `datafeed-${jobId}`;
try {
const {
body: { datafeeds: datafeedsResults },
} = await mlClient.getDatafeeds<MlDatafeedsResponse>({
datafeed_id: assumedDefaultDatafeedId,
...(excludeGenerated ? { exclude_generated: true } : {}),
});
if (
Array.isArray(datafeedsResults) &&
datafeedsResults.length === 1 &&
datafeedsResults[0].job_id === jobId
) {
return datafeedsResults[0];
} else {
return await findDatafeed();
}
} catch (e) {
// if assumedDefaultDatafeedId does not exist, ES will throw an error
return await findDatafeed();
}
}
return {
forceStartDatafeeds,
stopDatafeeds,
forceDeleteDatafeed,
getDatafeedIdsByJobId,
getJobIdsByDatafeedId,
getDatafeedByJobId,
};
}

View file

@ -18,6 +18,8 @@ import {
AuditMessage,
DatafeedWithStats,
CombinedJobWithStats,
Datafeed,
Job,
} from '../../../common/types/anomaly_detection_jobs';
import {
MlJobsResponse,
@ -47,7 +49,9 @@ interface Results {
export function jobsProvider(client: IScopedClusterClient, mlClient: MlClient) {
const { asInternalUser } = client;
const { forceDeleteDatafeed, getDatafeedIdsByJobId } = datafeedsProvider(mlClient);
const { forceDeleteDatafeed, getDatafeedIdsByJobId, getDatafeedByJobId } = datafeedsProvider(
mlClient
);
const { getAuditMessagesSummary } = jobAuditMessagesProvider(client, mlClient);
const { getLatestBucketTimestampByJob } = resultsServiceProvider(mlClient);
const calMngr = new CalendarManager(mlClient);
@ -257,6 +261,25 @@ export function jobsProvider(client: IScopedClusterClient, mlClient: MlClient) {
return { jobs, jobsMap };
}
async function getJobForCloning(jobId: string) {
const [{ body: jobResults }, datafeedResult] = await Promise.all([
mlClient.getJobs<MlJobsResponse>({ job_id: jobId, exclude_generated: true }),
getDatafeedByJobId(jobId, true),
]);
const result: { datafeed?: Datafeed; job?: Job } = { job: undefined, datafeed: undefined };
if (datafeedResult && datafeedResult.job_id === jobId) {
result.datafeed = datafeedResult;
}
if (jobResults && jobResults.jobs) {
const job = jobResults.jobs.find((j) => j.job_id === jobId);
if (job) {
result.job = job;
}
}
return result;
}
async function createFullJobsList(jobIds: string[] = []) {
const jobs: CombinedJobWithStats[] = [];
const groups: { [jobId: string]: string[] } = {};
@ -265,6 +288,7 @@ export function jobsProvider(client: IScopedClusterClient, mlClient: MlClient) {
const globalCalendars: string[] = [];
const jobIdsString = jobIds.join();
const [
{ body: jobResults },
{ body: jobStatsResults },
@ -502,6 +526,7 @@ export function jobsProvider(client: IScopedClusterClient, mlClient: MlClient) {
forceStopAndCloseJob,
jobsSummary,
jobsWithTimerange,
getJobForCloning,
createFullJobsList,
deletingJobTasks,
jobsExist,

View file

@ -73,6 +73,7 @@
"CloseJobs",
"JobsSummary",
"JobsWithTimeRange",
"GetJobForCloning",
"CreateFullJobsList",
"GetAllGroups",
"JobsExist",

View file

@ -19,6 +19,7 @@ import {
stopsDataFrameAnalyticsJobQuerySchema,
deleteDataFrameAnalyticsJobSchema,
jobsExistSchema,
analyticsQuerySchema,
} from './schemas/data_analytics_schema';
import { GetAnalyticsMapArgs, ExtendAnalyticsMapArgs } from '../models/data_frame_analytics/types';
import { IndexPatternHandler } from '../models/data_frame_analytics/index_patterns';
@ -102,7 +103,9 @@ export function dataFrameAnalyticsRoutes({ router, mlLicense, routeGuard }: Rout
},
routeGuard.fullLicenseAPIGuard(async ({ mlClient, response }) => {
try {
const { body } = await mlClient.getDataFrameAnalytics({ size: 1000 });
const { body } = await mlClient.getDataFrameAnalytics({
size: 1000,
});
return response.ok({
body,
});
@ -126,6 +129,7 @@ export function dataFrameAnalyticsRoutes({ router, mlLicense, routeGuard }: Rout
path: '/api/ml/data_frame/analytics/{analyticsId}',
validate: {
params: analyticsIdSchema,
query: analyticsQuerySchema,
},
options: {
tags: ['access:ml:canGetDataFrameAnalytics'],
@ -134,8 +138,11 @@ export function dataFrameAnalyticsRoutes({ router, mlLicense, routeGuard }: Rout
routeGuard.fullLicenseAPIGuard(async ({ mlClient, request, response }) => {
try {
const { analyticsId } = request.params;
const { excludeGenerated } = request.query;
const { body } = await mlClient.getDataFrameAnalytics({
id: analyticsId,
...(excludeGenerated ? { exclude_generated: true } : {}),
});
return response.ok({
body,

View file

@ -272,6 +272,40 @@ export function jobServiceRoutes({ router, routeGuard }: RouteInitialization) {
})
);
/**
* @apiGroup JobService
*
* @api {post} /api/ml/jobs/job_for_cloning Get job for cloning
* @apiName GetJobForCloning
* @apiDescription Get the job configuration with auto generated fields excluded for cloning
*
* @apiSchema (body) jobIdSchema
*/
router.post(
{
path: '/api/ml/jobs/job_for_cloning',
validate: {
body: jobIdSchema,
},
options: {
tags: ['access:ml:canGetJobs'],
},
},
routeGuard.fullLicenseAPIGuard(async ({ client, mlClient, request, response }) => {
try {
const { getJobForCloning } = jobServiceProvider(client, mlClient);
const { jobId } = request.body;
const resp = await getJobForCloning(jobId);
return response.ok({
body: resp,
});
} catch (e) {
return response.customError(wrapError(e));
}
})
);
/**
* @apiGroup JobService
*

View file

@ -64,6 +64,13 @@ export const analyticsIdSchema = schema.object({
analyticsId: schema.string(),
});
export const analyticsQuerySchema = schema.object({
/**
* Analytics Query
*/
excludeGenerated: schema.maybe(schema.boolean()),
});
export const deleteDataFrameAnalyticsJobSchema = schema.object({
/**
* Analytics Destination Index

View file

@ -39,6 +39,11 @@ export const forceStartDatafeedSchema = schema.object({
end: schema.maybe(schema.number()),
});
export const jobIdSchema = schema.object({
/** Optional list of job IDs. */
jobIds: schema.maybe(schema.string()),
});
export const jobIdsSchema = schema.object({
/** Optional list of job IDs. */
jobIds: schema.maybe(schema.arrayOf(schema.maybe(schema.string()))),