[ML] Fixing saved object sync in a serverless environment when apis are missing part 2 (#158749)

Follow on from https://github.com/elastic/kibana/pull/156585

Temporarily fixes https://github.com/elastic/kibana/issues/156500

With the ability to run a serverless elasticsearch locally, it was now
possible to debug and fix the remaining bugs.

The try/catches added in these PRs will probably be removed once we have
the ability to know what apis we are allowed to call in the serverless
project.
This commit is contained in:
James Gowdy 2023-06-06 09:14:29 +01:00 committed by GitHub
parent 1b04f5938a
commit 73226d5d70
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 124 additions and 101 deletions

View file

@ -106,10 +106,22 @@ export function jobSavedObjectsInitializationFactory(
// index: '.ml-config',
// });
// return body.count > 0;
let adJobsCount = 0;
let dfaJobsCount = 0;
const adJobs = await client.asInternalUser.ml.getJobs();
const dfaJobs = await client.asInternalUser.ml.getDataFrameAnalytics();
return adJobs.count > 0 || dfaJobs.count > 0;
try {
const adJobs = await client.asInternalUser.ml.getJobs();
adJobsCount = adJobs.count;
} catch (error) {
mlLog.debug(`Error fetching ML anomaly detection jobs ${JSON.stringify(error)}`);
}
try {
const dfaJobs = await client.asInternalUser.ml.getDataFrameAnalytics();
dfaJobsCount = dfaJobs.count;
} catch (error) {
mlLog.debug(`Error fetching ML data frame analytics jobs ${JSON.stringify(error)}`);
}
return adJobsCount > 0 || dfaJobsCount > 0;
}
return { initializeJobs };

View file

@ -16,7 +16,11 @@ import type {
} from '../../common/types/saved_objects';
import { checksFactory } from './checks';
import type { JobStatus } from './checks';
import { getSavedObjectClientError, getJobDetailsFromTrainedModel } from './util';
import {
getSavedObjectClientError,
getJobDetailsFromTrainedModel,
mlFunctionsFactory,
} from './util';
export interface JobSpaceOverrides {
overrides: {
@ -38,9 +42,11 @@ export function syncSavedObjectsFactory(
datafeedsRemoved: {},
};
const { getDatafeeds, getTrainedModels } = mlFunctionsFactory(client);
const [datafeeds, models, status] = await Promise.all([
client.asInternalUser.ml.getDatafeeds(),
client.asInternalUser.ml.getTrainedModels(),
getDatafeeds(),
getTrainedModels(),
checkStatus(),
]);
@ -107,40 +113,42 @@ export function syncSavedObjectsFactory(
}
}
for (const model of status.jobs['trained-model']) {
if (model.checks.savedObjectExits === false) {
const { modelId } = model;
const type = 'trained-model';
if (results.savedObjectsCreated[type] === undefined) {
results.savedObjectsCreated[type] = {};
}
if (simulate === true) {
results.savedObjectsCreated[type]![modelId] = { success: true };
} else {
// create model saved objects for models which are missing them
tasks.push(async () => {
try {
const mod = models.trained_model_configs.find((m) => m.model_id === modelId);
if (mod === undefined) {
if (models !== null) {
for (const model of status.jobs['trained-model']) {
if (model.checks.savedObjectExits === false) {
const { modelId } = model;
const type = 'trained-model';
if (results.savedObjectsCreated[type] === undefined) {
results.savedObjectsCreated[type] = {};
}
if (simulate === true) {
results.savedObjectsCreated[type]![modelId] = { success: true };
} else {
// create model saved objects for models which are missing them
tasks.push(async () => {
try {
const mod = models.trained_model_configs.find((m) => m.model_id === modelId);
if (mod === undefined) {
results.savedObjectsCreated[type]![modelId] = {
success: false,
error: `trained model ${modelId} not found`,
};
return;
}
const job = getJobDetailsFromTrainedModel(mod);
await mlSavedObjectService.createTrainedModel(modelId, job);
results.savedObjectsCreated[type]![modelId] = {
success: true,
};
} catch (error) {
results.savedObjectsCreated[type]![modelId] = {
success: false,
error: `trained model ${modelId} not found`,
};
return;
}
const job = getJobDetailsFromTrainedModel(mod);
await mlSavedObjectService.createTrainedModel(modelId, job);
results.savedObjectsCreated[type]![modelId] = {
success: true,
};
} catch (error) {
results.savedObjectsCreated[type]![modelId] = {
success: false,
error: getSavedObjectClientError(error),
};
}
});
error: getSavedObjectClientError(error),
};
}
});
}
}
}
}
@ -242,71 +250,74 @@ export function syncSavedObjectsFactory(
}
}
for (const job of status.savedObjects['anomaly-detector']) {
const type = 'anomaly-detector';
if (
(job.checks.datafeedExists === true && job.datafeedId === null) ||
(job.checks.datafeedExists === true &&
job.datafeedId !== null &&
adJobsById[job.jobId] &&
adJobsById[job.jobId].datafeedId !== job.datafeedId)
) {
if (results.datafeedsAdded[type] === undefined) {
results.datafeedsAdded[type] = {};
}
// add datafeed id for jobs where the datafeed exists but the id is missing from the saved object
// or if the datafeed id in the saved object is not the same as the one attached to the job in es
if (simulate === true) {
results.datafeedsAdded[type]![job.jobId] = { success: true };
} else {
const df = datafeeds.datafeeds.find((d) => d.job_id === job.jobId);
const jobId = job.jobId;
const datafeedId = df?.datafeed_id;
if (datafeeds !== null) {
for (const job of status.savedObjects['anomaly-detector']) {
const type = 'anomaly-detector';
if (
(job.checks.datafeedExists === true && job.datafeedId === null) ||
(job.checks.datafeedExists === true &&
job.datafeedId !== null &&
adJobsById[job.jobId] &&
adJobsById[job.jobId].datafeedId !== job.datafeedId)
) {
if (results.datafeedsAdded[type] === undefined) {
results.datafeedsAdded[type] = {};
}
// add datafeed id for jobs where the datafeed exists but the id is missing from the saved object
// or if the datafeed id in the saved object is not the same as the one attached to the job in es
if (simulate === true) {
results.datafeedsAdded[type]![job.jobId] = { success: true };
} else {
const df = datafeeds.datafeeds.find((d) => d.job_id === job.jobId);
const jobId = job.jobId;
const datafeedId = df?.datafeed_id;
tasks.push(async () => {
try {
if (datafeedId !== undefined) {
await mlSavedObjectService.addDatafeed(datafeedId, jobId);
tasks.push(async () => {
try {
if (datafeedId !== undefined) {
await mlSavedObjectService.addDatafeed(datafeedId, jobId);
}
results.datafeedsAdded[type]![job.jobId] = { success: true };
} catch (error) {
results.datafeedsAdded[type]![job.jobId] = {
success: false,
error: getSavedObjectClientError(error),
};
}
results.datafeedsAdded[type]![job.jobId] = { success: true };
} catch (error) {
results.datafeedsAdded[type]![job.jobId] = {
success: false,
});
}
} else if (
job.checks.jobExists === true &&
job.checks.datafeedExists === false &&
job.datafeedId !== null &&
job.datafeedId !== undefined
) {
if (results.datafeedsRemoved[type] === undefined) {
results.datafeedsRemoved[type] = {};
}
// remove datafeed id for jobs where the datafeed no longer exists but the id is populated in the saved object
if (simulate === true) {
results.datafeedsRemoved[type]![job.jobId] = { success: true };
} else {
const datafeedId = job.datafeedId;
tasks.push(async () => {
try {
await mlSavedObjectService.deleteDatafeed(datafeedId);
results.datafeedsRemoved[type]![job.jobId] = { success: true };
} catch (error) {
results.datafeedsRemoved[type]![job.jobId] = {
success: false,
error: getSavedObjectClientError(error),
};
}
});
}
} else if (
job.checks.jobExists === true &&
job.checks.datafeedExists === false &&
job.datafeedId !== null &&
job.datafeedId !== undefined
) {
if (results.datafeedsRemoved[type] === undefined) {
results.datafeedsRemoved[type] = {};
}
// remove datafeed id for jobs where the datafeed no longer exists but the id is populated in the saved object
if (simulate === true) {
results.datafeedsRemoved[type]![job.jobId] = { success: true };
} else {
const datafeedId = job.datafeedId;
tasks.push(async () => {
try {
await mlSavedObjectService.deleteDatafeed(datafeedId);
results.datafeedsRemoved[type]![job.jobId] = { success: true };
} catch (error) {
results.datafeedsRemoved[type]![job.jobId] = {
success: false,
error: getSavedObjectClientError(error),
};
}
});
error: getSavedObjectClientError(error),
};
}
});
}
}
}
}
await Promise.allSettled(tasks.map((t) => t()));
return results;
}

View file

@ -68,32 +68,32 @@ export function getJobDetailsFromTrainedModel(
* we return null.
*/
function mlFunctionsFactory(client: IScopedClusterClient) {
export function mlFunctionsFactory(client: IScopedClusterClient) {
return {
async getJobs() {
try {
return client.asInternalUser.ml.getJobs();
return await client.asInternalUser.ml.getJobs();
} catch (error) {
return null;
}
},
async getDatafeeds() {
try {
return client.asInternalUser.ml.getDatafeeds();
return await client.asInternalUser.ml.getDatafeeds();
} catch (error) {
return null;
}
},
async getTrainedModels() {
try {
return client.asInternalUser.ml.getTrainedModels();
return await client.asInternalUser.ml.getTrainedModels();
} catch (error) {
return null;
}
},
async getDataFrameAnalytics() {
try {
return client.asInternalUser.ml.getDataFrameAnalytics();
return await client.asInternalUser.ml.getDataFrameAnalytics();
} catch (error) {
return null;
}