[ML] job saved objects initialization (#82639)

* [ML] job saved objects initialization

* fixing job count logic

* adding missing files

* attempting to fix build crash

* fixing kibana.json

* changes based on review

* removing accidentally added export

* adding intialization promise

* use finally so errors dont stop initialization

* function rename

* removing duplicate header

* adding job initialization count to log message

* adding error to log message

* moving initialization file

* moving intialization file back again to fix git stash issues

* removing .kibana index search

* creating internal saved object client

* code clean up

* removing commented code

* adding check for spaces enabled

* adding ids to saved objects

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
James Gowdy 2020-11-10 21:02:09 +00:00 committed by GitHub
parent dc287eaadc
commit 096acb4da8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 557 additions and 332 deletions

View file

@ -6,6 +6,7 @@
import { KibanaRequest } from 'kibana/server';
import { PLUGIN_ID } from '../constants/app';
import { ML_SAVED_OBJECT_TYPE } from './saved_objects';
export const apmUserMlCapabilities = {
canGetJobs: false,
@ -78,7 +79,13 @@ export function getPluginPrivileges() {
const adminMlCapabilitiesKeys = Object.keys(adminMlCapabilities);
const allMlCapabilitiesKeys = [...adminMlCapabilitiesKeys, ...userMlCapabilitiesKeys];
// TODO: include ML in base privileges for the `8.0` release: https://github.com/elastic/kibana/issues/71422
const savedObjects = ['index-pattern', 'dashboard', 'search', 'visualization', 'ml-job'];
const savedObjects = [
'index-pattern',
'dashboard',
'search',
'visualization',
ML_SAVED_OBJECT_TYPE,
];
const privilege = {
app: [PLUGIN_ID, 'kibana'],
excludeFromBasePrivileges: true,

View file

@ -5,3 +5,4 @@
*/
export type JobType = 'anomaly-detector' | 'data-frame-analytics';
export const ML_SAVED_OBJECT_TYPE = 'ml-job';

View file

@ -9,6 +9,7 @@
import { HttpService } from '../http_service';
import { basePath } from './index';
import { JobType } from '../../../../common/types/saved_objects';
export const savedObjectsApiProvider = (httpService: HttpService) => ({
jobsSpaces() {
@ -17,4 +18,20 @@ export const savedObjectsApiProvider = (httpService: HttpService) => ({
method: 'GET',
});
},
assignJobToSpace(jobType: JobType, jobIds: string[], spaces: string[]) {
const body = JSON.stringify({ jobType, jobIds, spaces });
return httpService.http<any>({
path: `${basePath()}/saved_objects/assign_job_to_space`,
method: 'POST',
body,
});
},
removeJobFromSpace(jobType: JobType, jobIds: string[], spaces: string[]) {
const body = JSON.stringify({ jobType, jobIds, spaces });
return httpService.http<any>({
path: `${basePath()}/saved_objects/remove_job_from_space`,
method: 'POST',
body,
});
},
});

View file

@ -6,7 +6,7 @@
import { KibanaRequest } from 'kibana/server';
import type { MlClient } from '../../lib/ml_client';
import { mlLog } from '../../client/log';
import { mlLog } from '../../lib/log';
import {
MlCapabilities,
adminMlCapabilities,

View file

@ -5,7 +5,7 @@
*/
import type { MlClient } from '../../lib/ml_client';
import { mlLog } from '../../client/log';
import { mlLog } from '../../lib/log';
export function upgradeCheckProvider(mlClient: MlClient) {
async function isUpgradeInProgress(): Promise<boolean> {

View file

@ -5,7 +5,7 @@
*/
import { IScopedClusterClient } from 'kibana/server';
import { mlLog } from '../../client/log';
import { mlLog } from '../../lib/log';
import {
ML_ANNOTATIONS_INDEX_ALIAS_READ,

View file

@ -12,6 +12,7 @@ import {
RequestHandler,
SavedObjectsClientContract,
} from 'kibana/server';
import { SpacesPluginSetup } from '../../../spaces/server';
import { jobSavedObjectServiceFactory, JobSavedObjectService } from '../saved_objects';
import { MlLicense } from '../../common/license';
@ -28,14 +29,27 @@ type Handler = (handlerParams: {
}) => ReturnType<RequestHandler>;
type GetMlSavedObjectClient = (request: KibanaRequest) => SavedObjectsClientContract | null;
type GetInternalSavedObjectClient = () => SavedObjectsClientContract | null;
export class RouteGuard {
private _mlLicense: MlLicense;
private _getMlSavedObjectClient: GetMlSavedObjectClient;
private _getInternalSavedObjectClient: GetInternalSavedObjectClient;
private _spacesPlugin: SpacesPluginSetup | undefined;
private _isMlReady: () => Promise<void>;
constructor(mlLicense: MlLicense, getSavedObject: GetMlSavedObjectClient) {
constructor(
mlLicense: MlLicense,
getSavedObject: GetMlSavedObjectClient,
getInternalSavedObject: GetInternalSavedObjectClient,
spacesPlugin: SpacesPluginSetup | undefined,
isMlReady: () => Promise<void>
) {
this._mlLicense = mlLicense;
this._getMlSavedObjectClient = getSavedObject;
this._getInternalSavedObjectClient = getInternalSavedObject;
this._spacesPlugin = spacesPlugin;
this._isMlReady = isMlReady;
}
public fullLicenseAPIGuard(handler: Handler) {
@ -56,13 +70,19 @@ export class RouteGuard {
}
const mlSavedObjectClient = this._getMlSavedObjectClient(request);
if (mlSavedObjectClient === null) {
const internalSavedObjectsClient = this._getInternalSavedObjectClient();
if (mlSavedObjectClient === null || internalSavedObjectsClient === null) {
return response.badRequest({
body: { message: 'saved object client has not been initialized' },
});
}
const jobSavedObjectService = jobSavedObjectServiceFactory(mlSavedObjectClient);
const jobSavedObjectService = jobSavedObjectServiceFactory(
mlSavedObjectClient,
internalSavedObjectsClient,
this._spacesPlugin !== undefined,
this._isMlReady
);
const client = context.core.elasticsearch.client;
return handler({

View file

@ -6,36 +6,31 @@
import { Legacy } from 'kibana';
import { KibanaRequest } from 'kibana/server';
import { Space, SpacesPluginSetup } from '../../../spaces/server';
import { SpacesPluginSetup } from '../../../spaces/server';
export type RequestFacade = KibanaRequest | Legacy.Request;
interface GetActiveSpaceResponse {
valid: boolean;
space?: Space;
}
export function spacesUtilsProvider(spacesPlugin: SpacesPluginSetup, request: RequestFacade) {
async function activeSpace(): Promise<GetActiveSpaceResponse> {
try {
return {
valid: true,
space: await spacesPlugin.spacesService.getActiveSpace(request),
};
} catch (e) {
return {
valid: false,
};
}
}
export function spacesUtilsProvider(
spacesPlugin: SpacesPluginSetup | undefined,
request: RequestFacade
) {
async function isMlEnabledInSpace(): Promise<boolean> {
const { valid, space } = await activeSpace();
if (valid === true && space !== undefined) {
return space.disabledFeatures.includes('ml') === false;
if (spacesPlugin === undefined) {
// if spaces is disabled force isMlEnabledInSpace to be true
return true;
}
return true;
const space = await spacesPlugin.spacesService.getActiveSpace(request);
return space.disabledFeatures.includes('ml') === false;
}
return { isMlEnabledInSpace };
async function getAllSpaces(): Promise<string[] | null> {
if (spacesPlugin === undefined) {
return null;
}
const client = await spacesPlugin.spacesService.scopedClient(request);
const spaces = await client.getAll();
return spaces.map((s) => s.id);
}
return { isMlEnabledInSpace, getAllSpaces };
}

View file

@ -6,7 +6,7 @@
import { cloneDeep, each, remove, sortBy, get } from 'lodash';
import { mlLog } from '../../client/log';
import { mlLog } from '../../lib/log';
import { INTERVALS } from './intervals';
import { singleSeriesCheckerFactory } from './single_series_checker';

View file

@ -10,7 +10,7 @@
* Bucket spans: 5m, 10m, 30m, 1h, 3h
*/
import { mlLog } from '../../client/log';
import { mlLog } from '../../lib/log';
import { INTERVALS, LONG_INTERVALS } from './intervals';
export function singleSeriesCheckerFactory({ asCurrentUser }) {

View file

@ -37,7 +37,7 @@ import {
prefixDatafeedId,
splitIndexPatternNames,
} from '../../../common/util/job_utils';
import { mlLog } from '../../client/log';
import { mlLog } from '../../lib/log';
import { calculateModelMemoryLimitProvider } from '../calculate_model_memory_limit';
import { fieldsServiceProvider } from '../fields_service';
import { jobServiceProvider } from '../job_service';

View file

@ -15,15 +15,15 @@ import {
CapabilitiesStart,
IClusterClient,
SavedObjectsServiceStart,
SavedObjectsClientContract,
} from 'kibana/server';
import { DEFAULT_APP_CATEGORIES } from '../../../../src/core/server';
import { SpacesPluginSetup } from '../../spaces/server';
import { PluginsSetup, RouteInitialization } from './types';
import { PLUGIN_ID } from '../common/constants/app';
import { MlCapabilities } from '../common/types/capabilities';
import { initMlTelemetry } from './lib/telemetry';
import { initMlServerLog } from './client/log';
import { initMlServerLog } from './lib/log';
import { initSampleDataSets } from './lib/sample_data_sets';
import { annotationRoutes } from './routes/annotations';
@ -50,7 +50,11 @@ import { getPluginPrivileges } from '../common/types/capabilities';
import { setupCapabilitiesSwitcher } from './lib/capabilities';
import { registerKibanaSettings } from './lib/register_settings';
import { trainedModelsRoutes } from './routes/trained_models';
import { setupSavedObjects } from './saved_objects';
import {
setupSavedObjects,
jobSavedObjectsInitializationFactory,
savedObjectClientsFactory,
} from './saved_objects';
import { RouteGuard } from './lib/route_guard';
export type MlPluginSetup = SharedServices;
@ -63,14 +67,19 @@ export class MlServerPlugin implements Plugin<MlPluginSetup, MlPluginStart, Plug
private capabilities: CapabilitiesStart | null = null;
private clusterClient: IClusterClient | null = null;
private savedObjectsStart: SavedObjectsServiceStart | null = null;
private spacesPlugin: SpacesPluginSetup | undefined;
private isMlReady: Promise<void>;
private setMlReady: () => void = () => {};
constructor(ctx: PluginInitializerContext) {
this.log = ctx.logger.get();
this.version = ctx.env.packageInfo.branch;
this.mlLicense = new MlLicense();
this.isMlReady = new Promise((resolve) => (this.setMlReady = resolve));
}
public setup(coreSetup: CoreSetup, plugins: PluginsSetup): MlPluginSetup {
this.spacesPlugin = plugins.spaces;
const { admin, user, apmUser } = getPluginPrivileges();
plugins.features.registerKibanaFeature({
@ -120,18 +129,19 @@ export class MlServerPlugin implements Plugin<MlPluginSetup, MlPluginStart, Plug
setupCapabilitiesSwitcher(coreSetup, plugins.licensing.license$, this.log);
setupSavedObjects(coreSetup.savedObjects);
const getMlSavedObjectsClient = (request: KibanaRequest): SavedObjectsClientContract | null => {
if (this.savedObjectsStart === null) {
return null;
}
return this.savedObjectsStart.getScopedClient(request, {
includedHiddenTypes: ['ml-job'],
});
};
const { getInternalSavedObjectsClient, getMlSavedObjectsClient } = savedObjectClientsFactory(
() => this.savedObjectsStart
);
const routeInit: RouteInitialization = {
router: coreSetup.http.createRouter(),
routeGuard: new RouteGuard(this.mlLicense, getMlSavedObjectsClient),
routeGuard: new RouteGuard(
this.mlLicense,
getMlSavedObjectsClient,
getInternalSavedObjectsClient,
plugins.spaces,
() => this.isMlReady
),
mlLicense: this.mlLicense,
};
@ -176,7 +186,9 @@ export class MlServerPlugin implements Plugin<MlPluginSetup, MlPluginStart, Plug
plugins.spaces,
plugins.cloud,
resolveMlCapabilities,
() => this.clusterClient
() => this.clusterClient,
() => getInternalSavedObjectsClient(),
() => this.isMlReady
),
};
}
@ -185,6 +197,16 @@ export class MlServerPlugin implements Plugin<MlPluginSetup, MlPluginStart, Plug
this.capabilities = coreStart.capabilities;
this.clusterClient = coreStart.elasticsearch.client;
this.savedObjectsStart = coreStart.savedObjects;
// check whether the job saved objects exist
// and create them if needed.
const { initializeJobs } = jobSavedObjectsInitializationFactory(
coreStart,
this.spacesPlugin !== undefined
);
initializeJobs().finally(() => {
this.setMlReady();
});
}
public stop() {

View file

@ -6,7 +6,7 @@
import { wrapError } from '../client/error_wrapper';
import { RouteInitialization } from '../types';
import { checksFactory } from '../saved_objects';
import { checksFactory, repairFactory } from '../saved_objects';
import { jobsAndSpaces, repairJobObjects } from './schemas/saved_objects';
/**
@ -67,7 +67,7 @@ export function savedObjectsRoutes({ router, routeGuard }: RouteInitialization)
routeGuard.fullLicenseAPIGuard(async ({ client, request, response, jobSavedObjectService }) => {
try {
const { simulate } = request.query;
const { repairJobs } = checksFactory(client, jobSavedObjectService);
const { repairJobs } = repairFactory(client, jobSavedObjectService);
const savedObjects = await repairJobs(simulate);
return response.ok({
@ -100,7 +100,7 @@ export function savedObjectsRoutes({ router, routeGuard }: RouteInitialization)
routeGuard.fullLicenseAPIGuard(async ({ client, request, response, jobSavedObjectService }) => {
try {
const { simulate } = request.query;
const { initSavedObjects } = checksFactory(client, jobSavedObjectService);
const { initSavedObjects } = repairFactory(client, jobSavedObjectService);
const savedObjects = await initSavedObjects(simulate);
return response.ok({

View file

@ -9,7 +9,7 @@ import { schema } from '@kbn/config-schema';
import { Request } from '@hapi/hapi';
import { IScopedClusterClient } from 'kibana/server';
import { wrapError } from '../client/error_wrapper';
import { mlLog } from '../client/log';
import { mlLog } from '../lib/log';
import { capabilitiesProvider } from '../lib/capabilities';
import { spacesUtilsProvider } from '../lib/spaces_utils';
import { RouteInitialization, SystemRouteDeps } from '../types';
@ -117,11 +117,7 @@ export function systemRoutes(
},
routeGuard.basicLicenseAPIGuard(async ({ mlClient, request, response }) => {
try {
// if spaces is disabled force isMlEnabledInSpace to be true
const { isMlEnabledInSpace } =
spaces !== undefined
? spacesUtilsProvider(spaces, (request as unknown) as Request)
: { isMlEnabledInSpace: async () => true };
const { isMlEnabledInSpace } = spacesUtilsProvider(spaces, (request as unknown) as Request);
const mlCapabilities = await resolveMlCapabilities(request);
if (mlCapabilities === null) {

View file

@ -4,11 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/
import Boom from '@hapi/boom';
import { IScopedClusterClient } from 'kibana/server';
import { SearchResponse } from 'elasticsearch';
import type { JobObject, JobSavedObjectService } from './service';
import { ML_SAVED_OBJECT_TYPE } from './saved_objects';
import type { JobSavedObjectService } from './service';
import { JobType } from '../../common/types/saved_objects';
import { Job } from '../../common/types/anomaly_detection_jobs';
@ -34,14 +31,6 @@ interface JobStatus {
};
}
interface SavedObjectJob {
[ML_SAVED_OBJECT_TYPE]: {
job_id: string;
type: JobType;
};
namespaces: string[];
}
interface StatusResponse {
savedObjects: {
[type in JobType]: JobSavedObjectStatus[];
@ -57,6 +46,7 @@ export function checksFactory(
) {
async function checkStatus(): Promise<StatusResponse> {
const jobObjects = await jobSavedObjectService.getAllJobObjects(undefined, false);
// load all non-space jobs and datafeeds
const { body: adJobs } = await client.asInternalUser.ml.getJobs<{ jobs: Job[] }>();
const { body: datafeeds } = await client.asInternalUser.ml.getDatafeeds<{
@ -95,16 +85,17 @@ export function checksFactory(
}
);
const nonSpaceSavedObjects = await _loadAllJobSavedObjects();
const allJobObjects = await jobSavedObjectService.getAllJobObjectsForAllSpaces();
const nonSpaceADObjectIds = new Set(
nonSpaceSavedObjects
.filter(({ type }) => type === 'anomaly-detector')
.map(({ jobId }) => jobId)
allJobObjects
.filter(({ attributes }) => attributes.type === 'anomaly-detector')
.map(({ attributes }) => attributes.job_id)
);
const nonSpaceDFAObjectIds = new Set(
nonSpaceSavedObjects
.filter(({ type }) => type === 'data-frame-analytics')
.map(({ jobId }) => jobId)
allJobObjects
.filter(({ attributes }) => attributes.type === 'data-frame-analytics')
.map(({ attributes }) => attributes.job_id)
);
const adObjectIds = new Set(
@ -163,230 +154,5 @@ export function checksFactory(
};
}
async function repairJobs(simulate: boolean = false) {
type Result = Record<string, { success: boolean; error?: any }>;
const results: {
savedObjectsCreated: Result;
savedObjectsDeleted: Result;
datafeedsAdded: Result;
datafeedsRemoved: Result;
} = {
savedObjectsCreated: {},
savedObjectsDeleted: {},
datafeedsAdded: {},
datafeedsRemoved: {},
};
const { body: datafeeds } = await client.asInternalUser.ml.getDatafeeds<{
datafeeds: Datafeed[];
}>();
const tasks: Array<() => Promise<void>> = [];
const status = await checkStatus();
for (const job of status.jobs['anomaly-detector']) {
if (job.checks.savedObjectExits === false) {
if (simulate === true) {
results.savedObjectsCreated[job.jobId] = { success: true };
} else {
// create AD saved objects for jobs which are missing them
const jobId = job.jobId;
const datafeedId = job.datafeedId;
tasks.push(async () => {
try {
await jobSavedObjectService.createAnomalyDetectionJob(jobId, datafeedId ?? undefined);
results.savedObjectsCreated[job.jobId] = { success: true };
} catch (error) {
results.savedObjectsCreated[job.jobId] = {
success: false,
error: error.body ?? error,
};
}
});
}
}
}
for (const job of status.jobs['data-frame-analytics']) {
if (job.checks.savedObjectExits === false) {
if (simulate === true) {
results.savedObjectsCreated[job.jobId] = { success: true };
} else {
// create DFA saved objects for jobs which are missing them
const jobId = job.jobId;
tasks.push(async () => {
try {
await jobSavedObjectService.createDataFrameAnalyticsJob(jobId);
results.savedObjectsCreated[job.jobId] = { success: true };
} catch (error) {
results.savedObjectsCreated[job.jobId] = {
success: false,
error: error.body ?? error,
};
}
});
}
}
}
for (const job of status.savedObjects['anomaly-detector']) {
if (job.checks.jobExists === false) {
if (simulate === true) {
results.savedObjectsDeleted[job.jobId] = { success: true };
} else {
// Delete AD saved objects for jobs which no longer exist
const jobId = job.jobId;
tasks.push(async () => {
try {
await jobSavedObjectService.deleteAnomalyDetectionJob(jobId);
results.savedObjectsDeleted[job.jobId] = { success: true };
} catch (error) {
results.savedObjectsDeleted[job.jobId] = {
success: false,
error: error.body ?? error,
};
}
});
}
}
}
for (const job of status.savedObjects['data-frame-analytics']) {
if (job.checks.jobExists === false) {
if (simulate === true) {
results.savedObjectsDeleted[job.jobId] = { success: true };
} else {
// Delete DFA saved objects for jobs which no longer exist
const jobId = job.jobId;
tasks.push(async () => {
try {
await jobSavedObjectService.deleteDataFrameAnalyticsJob(jobId);
results.savedObjectsDeleted[job.jobId] = { success: true };
} catch (error) {
results.savedObjectsDeleted[job.jobId] = {
success: false,
error: error.body ?? error,
};
}
});
}
}
}
for (const job of status.savedObjects['anomaly-detector']) {
if (job.checks.datafeedExists === true && job.datafeedId === null) {
// add datafeed id for jobs where the datafeed exists but the id is missing from the saved object
if (simulate === true) {
results.datafeedsAdded[job.jobId] = { success: true };
} else {
const df = datafeeds.datafeeds.find((d) => d.job_id === job.jobId);
const jobId = job.jobId;
const datafeedId = df?.datafeed_id;
tasks.push(async () => {
try {
if (datafeedId !== undefined) {
await jobSavedObjectService.addDatafeed(datafeedId, jobId);
}
results.datafeedsAdded[job.jobId] = { success: true };
} catch (error) {
results.datafeedsAdded[job.jobId] = { success: false, error };
}
});
}
} else if (
job.checks.jobExists === true &&
job.checks.datafeedExists === false &&
job.datafeedId !== null &&
job.datafeedId !== undefined
) {
// remove datafeed id for jobs where the datafeed no longer exists but the id is populated in the saved object
if (simulate === true) {
results.datafeedsRemoved[job.jobId] = { success: true };
} else {
const datafeedId = job.datafeedId;
tasks.push(async () => {
try {
await jobSavedObjectService.deleteDatafeed(datafeedId);
results.datafeedsRemoved[job.jobId] = { success: true };
} catch (error) {
results.datafeedsRemoved[job.jobId] = { success: false, error: error.body ?? error };
}
});
}
}
}
await Promise.allSettled(tasks.map((t) => t()));
return results;
}
async function initSavedObjects(simulate: boolean = false, namespaces: string[] = ['*']) {
const results: { jobs: Array<{ id: string; type: string }>; success: boolean; error?: any } = {
jobs: [],
success: true,
};
const status = await checkStatus();
const jobs: JobObject[] = [];
const types: JobType[] = ['anomaly-detector', 'data-frame-analytics'];
types.forEach((type) => {
status.jobs[type].forEach((job) => {
if (job.checks.savedObjectExits === false) {
if (simulate === true) {
results.jobs.push({ id: job.jobId, type });
} else {
jobs.push({
job_id: job.jobId,
datafeed_id: job.datafeedId ?? null,
type,
});
}
}
});
});
try {
const createResults = await jobSavedObjectService.bulkCreateJobs(jobs, namespaces);
createResults.saved_objects.forEach(({ attributes }) => {
results.jobs.push({
id: attributes.job_id,
type: attributes.type,
});
});
} catch (error) {
results.success = false;
results.error = Boom.boomify(error).output;
}
return results;
}
async function _loadAllJobSavedObjects() {
const { body } = await client.asInternalUser.search<SearchResponse<SavedObjectJob>>({
index: '.kibana*',
size: 1000,
_source: ['ml-job.job_id', 'ml-job.type', 'namespaces'],
body: {
query: {
bool: {
filter: [
{
term: {
type: 'ml-job',
},
},
],
},
},
},
});
return body.hits.hits.map(({ _source }) => {
const { job_id: jobId, type } = _source[ML_SAVED_OBJECT_TYPE];
return {
jobId,
type,
spaces: _source.namespaces,
};
});
}
return { checkStatus, repairJobs, initSavedObjects };
return { checkStatus };
}

View file

@ -7,3 +7,6 @@
export { setupSavedObjects } from './saved_objects';
export { JobObject, JobSavedObjectService, jobSavedObjectServiceFactory } from './service';
export { checksFactory } from './checks';
export { repairFactory } from './repair';
export { jobSavedObjectsInitializationFactory } from './initialization';
export { savedObjectClientsFactory } from './util';

View file

@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { IScopedClusterClient, CoreStart, SavedObjectsClientContract } from 'kibana/server';
import { savedObjectClientsFactory } from './util';
import { repairFactory } from './repair';
import { jobSavedObjectServiceFactory, JobObject } from './service';
import { mlLog } from '../lib/log';
import { ML_SAVED_OBJECT_TYPE } from '../../common/types/saved_objects';
/**
* Creates initializeJobs function which is used to check whether
* ml job saved objects exist and creates them if needed
*
* @param core: CoreStart
*/
export function jobSavedObjectsInitializationFactory(core: CoreStart, spacesEnabled: boolean) {
const client = (core.elasticsearch.client as unknown) as IScopedClusterClient;
/**
* Check whether ML saved objects exist.
* If they don't, check to see whether ML jobs exist.
* If jobs exist, but the saved objects do not, create the saved objects.
*
*/
async function initializeJobs() {
try {
const { getInternalSavedObjectsClient } = savedObjectClientsFactory(() => core.savedObjects);
const savedObjectsClient = getInternalSavedObjectsClient();
if (savedObjectsClient === null) {
mlLog.error('Internal saved object client not initialized!');
return;
}
const jobSavedObjectService = jobSavedObjectServiceFactory(
savedObjectsClient,
savedObjectsClient,
spacesEnabled,
() => Promise.resolve() // pretend isMlReady, to allow us to initialize the saved objects
);
if ((await _needsInitializing(savedObjectsClient)) === false) {
// ml job saved objects have already been initialized
return;
}
mlLog.info('Initializing job saved objects');
const { initSavedObjects } = repairFactory(client, jobSavedObjectService);
const { jobs } = await initSavedObjects();
mlLog.info(`${jobs.length} job saved objects initialized for * space`);
} catch (error) {
mlLog.error(`Error Initializing jobs ${JSON.stringify(error)}`);
}
}
async function _needsInitializing(savedObjectsClient: SavedObjectsClientContract) {
if (await _jobSavedObjectsExist(savedObjectsClient)) {
// at least one ml saved object exists
// this has been initialized before
return false;
}
if (await _jobsExist()) {
// some ml jobs exist, we need to create those saved objects
return true;
}
// no ml jobs actually exist,
// that's why there were no saved objects
return false;
}
async function _jobSavedObjectsExist(savedObjectsClient: SavedObjectsClientContract) {
const options = {
type: ML_SAVED_OBJECT_TYPE,
perPage: 0,
namespaces: ['*'],
};
const { total } = await savedObjectsClient.find<JobObject>(options);
return total > 0;
}
async function _jobsExist() {
// it would be better to use a simple count search here
// but the kibana user does not have access to .ml-config
//
// const { body } = await client.asInternalUser.count({
// index: '.ml-config',
// });
// return body.count > 0;
const { body: adJobs } = await client.asInternalUser.ml.getJobs<{ count: number }>();
const { body: dfaJobs } = await client.asInternalUser.ml.getDataFrameAnalytics<{
count: number;
}>();
return adJobs.count > 0 || dfaJobs.count > 0;
}
return { initializeJobs };
}

View file

@ -0,0 +1,217 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import Boom from '@hapi/boom';
import { IScopedClusterClient } from 'kibana/server';
import type { JobObject, JobSavedObjectService } from './service';
import { JobType } from '../../common/types/saved_objects';
import { checksFactory } from './checks';
import { Datafeed } from '../../common/types/anomaly_detection_jobs';
export function repairFactory(
client: IScopedClusterClient,
jobSavedObjectService: JobSavedObjectService
) {
const { checkStatus } = checksFactory(client, jobSavedObjectService);
async function repairJobs(simulate: boolean = false) {
type Result = Record<string, { success: boolean; error?: any }>;
const results: {
savedObjectsCreated: Result;
savedObjectsDeleted: Result;
datafeedsAdded: Result;
datafeedsRemoved: Result;
} = {
savedObjectsCreated: {},
savedObjectsDeleted: {},
datafeedsAdded: {},
datafeedsRemoved: {},
};
const { body: datafeeds } = await client.asInternalUser.ml.getDatafeeds<{
datafeeds: Datafeed[];
}>();
const tasks: Array<() => Promise<void>> = [];
const status = await checkStatus();
for (const job of status.jobs['anomaly-detector']) {
if (job.checks.savedObjectExits === false) {
if (simulate === true) {
results.savedObjectsCreated[job.jobId] = { success: true };
} else {
// create AD saved objects for jobs which are missing them
const jobId = job.jobId;
const datafeedId = job.datafeedId;
tasks.push(async () => {
try {
await jobSavedObjectService.createAnomalyDetectionJob(jobId, datafeedId ?? undefined);
results.savedObjectsCreated[job.jobId] = { success: true };
} catch (error) {
results.savedObjectsCreated[job.jobId] = {
success: false,
error: error.body ?? error,
};
}
});
}
}
}
for (const job of status.jobs['data-frame-analytics']) {
if (job.checks.savedObjectExits === false) {
if (simulate === true) {
results.savedObjectsCreated[job.jobId] = { success: true };
} else {
// create DFA saved objects for jobs which are missing them
const jobId = job.jobId;
tasks.push(async () => {
try {
await jobSavedObjectService.createDataFrameAnalyticsJob(jobId);
results.savedObjectsCreated[job.jobId] = { success: true };
} catch (error) {
results.savedObjectsCreated[job.jobId] = {
success: false,
error: error.body ?? error,
};
}
});
}
}
}
for (const job of status.savedObjects['anomaly-detector']) {
if (job.checks.jobExists === false) {
if (simulate === true) {
results.savedObjectsDeleted[job.jobId] = { success: true };
} else {
// Delete AD saved objects for jobs which no longer exist
const jobId = job.jobId;
tasks.push(async () => {
try {
await jobSavedObjectService.deleteAnomalyDetectionJob(jobId);
results.savedObjectsDeleted[job.jobId] = { success: true };
} catch (error) {
results.savedObjectsDeleted[job.jobId] = {
success: false,
error: error.body ?? error,
};
}
});
}
}
}
for (const job of status.savedObjects['data-frame-analytics']) {
if (job.checks.jobExists === false) {
if (simulate === true) {
results.savedObjectsDeleted[job.jobId] = { success: true };
} else {
// Delete DFA saved objects for jobs which no longer exist
const jobId = job.jobId;
tasks.push(async () => {
try {
await jobSavedObjectService.deleteDataFrameAnalyticsJob(jobId);
results.savedObjectsDeleted[job.jobId] = { success: true };
} catch (error) {
results.savedObjectsDeleted[job.jobId] = {
success: false,
error: error.body ?? error,
};
}
});
}
}
}
for (const job of status.savedObjects['anomaly-detector']) {
if (job.checks.datafeedExists === true && job.datafeedId === null) {
// add datafeed id for jobs where the datafeed exists but the id is missing from the saved object
if (simulate === true) {
results.datafeedsAdded[job.jobId] = { success: true };
} else {
const df = datafeeds.datafeeds.find((d) => d.job_id === job.jobId);
const jobId = job.jobId;
const datafeedId = df?.datafeed_id;
tasks.push(async () => {
try {
if (datafeedId !== undefined) {
await jobSavedObjectService.addDatafeed(datafeedId, jobId);
}
results.datafeedsAdded[job.jobId] = { success: true };
} catch (error) {
results.datafeedsAdded[job.jobId] = { success: false, error };
}
});
}
} else if (
job.checks.jobExists === true &&
job.checks.datafeedExists === false &&
job.datafeedId !== null &&
job.datafeedId !== undefined
) {
// remove datafeed id for jobs where the datafeed no longer exists but the id is populated in the saved object
if (simulate === true) {
results.datafeedsRemoved[job.jobId] = { success: true };
} else {
const datafeedId = job.datafeedId;
tasks.push(async () => {
try {
await jobSavedObjectService.deleteDatafeed(datafeedId);
results.datafeedsRemoved[job.jobId] = { success: true };
} catch (error) {
results.datafeedsRemoved[job.jobId] = { success: false, error: error.body ?? error };
}
});
}
}
}
await Promise.allSettled(tasks.map((t) => t()));
return results;
}
async function initSavedObjects(simulate: boolean = false, namespaces: string[] = ['*']) {
const results: { jobs: Array<{ id: string; type: string }>; success: boolean; error?: any } = {
jobs: [],
success: true,
};
const status = await checkStatus();
const jobs: JobObject[] = [];
const types: JobType[] = ['anomaly-detector', 'data-frame-analytics'];
types.forEach((type) => {
status.jobs[type].forEach((job) => {
if (job.checks.savedObjectExits === false) {
if (simulate === true) {
results.jobs.push({ id: job.jobId, type });
} else {
jobs.push({
job_id: job.jobId,
datafeed_id: job.datafeedId ?? null,
type,
});
}
}
});
});
try {
const createResults = await jobSavedObjectService.bulkCreateJobs(jobs, namespaces);
createResults.saved_objects.forEach(({ attributes }) => {
results.jobs.push({
id: attributes.job_id,
type: attributes.type,
});
});
} catch (error) {
results.success = false;
results.error = Boom.boomify(error).output;
}
return results;
}
return { checkStatus, repairJobs, initSavedObjects };
}

View file

@ -8,8 +8,7 @@ import { SavedObjectsServiceSetup } from 'kibana/server';
import mappings from './mappings.json';
import { migrations } from './migrations';
export const ML_SAVED_OBJECT_TYPE = 'ml-job';
import { ML_SAVED_OBJECT_TYPE } from '../../common/types/saved_objects';
export function setupSavedObjects(savedObjects: SavedObjectsServiceSetup) {
savedObjects.registerType({

View file

@ -6,8 +6,7 @@
import RE2 from 're2';
import { SavedObjectsClientContract, SavedObjectsFindOptions } from 'kibana/server';
import { ML_SAVED_OBJECT_TYPE } from './saved_objects';
import { JobType } from '../../common/types/saved_objects';
import { JobType, ML_SAVED_OBJECT_TYPE } from '../../common/types/saved_objects';
import { MLJobNotFound } from '../lib/ml_client';
export interface JobObject {
@ -19,13 +18,19 @@ type JobObjectFilter = { [k in keyof JobObject]?: string };
export type JobSavedObjectService = ReturnType<typeof jobSavedObjectServiceFactory>;
export function jobSavedObjectServiceFactory(savedObjectsClient: SavedObjectsClientContract) {
export function jobSavedObjectServiceFactory(
savedObjectsClient: SavedObjectsClientContract,
internalSavedObjectsClient: SavedObjectsClientContract,
spacesEnabled: boolean,
isMlReady: () => Promise<void>
) {
async function _getJobObjects(
jobType?: JobType,
jobId?: string,
datafeedId?: string,
currentSpaceOnly: boolean = true
) {
await isMlReady();
const filterObject: JobObjectFilter = {};
if (jobType !== undefined) {
@ -41,7 +46,7 @@ export function jobSavedObjectServiceFactory(savedObjectsClient: SavedObjectsCli
const options: SavedObjectsFindOptions = {
type: ML_SAVED_OBJECT_TYPE,
perPage: 10000,
...(currentSpaceOnly === true ? {} : { namespaces: ['*'] }),
...(spacesEnabled === false || currentSpaceOnly === true ? {} : { namespaces: ['*'] }),
searchFields,
filter,
};
@ -52,23 +57,24 @@ export function jobSavedObjectServiceFactory(savedObjectsClient: SavedObjectsCli
}
async function _createJob(jobType: JobType, jobId: string, datafeedId?: string) {
try {
await _deleteJob(jobType, jobId);
} catch (error) {
// fail silently
// the job object may or may not already exist, we'll overwrite it anyway.
}
await savedObjectsClient.create<JobObject>(ML_SAVED_OBJECT_TYPE, {
job_id: jobId,
datafeed_id: datafeedId ?? null,
type: jobType,
});
await isMlReady();
await savedObjectsClient.create<JobObject>(
ML_SAVED_OBJECT_TYPE,
{
job_id: jobId,
datafeed_id: datafeedId ?? null,
type: jobType,
},
{ id: jobId, overwrite: true }
);
}
async function _bulkCreateJobs(jobs: JobObject[], namespaces?: string[]) {
await isMlReady();
return await savedObjectsClient.bulkCreate<JobObject>(
jobs.map((j) => ({
type: ML_SAVED_OBJECT_TYPE,
id: j.job_id,
attributes: j,
initialNamespaces: namespaces,
}))
@ -82,7 +88,7 @@ export function jobSavedObjectServiceFactory(savedObjectsClient: SavedObjectsCli
throw new MLJobNotFound('job not found');
}
await savedObjectsClient.delete(ML_SAVED_OBJECT_TYPE, job.id);
await savedObjectsClient.delete(ML_SAVED_OBJECT_TYPE, job.id, { force: true });
}
async function createAnomalyDetectionJob(jobId: string, datafeedId?: string) {
@ -109,6 +115,26 @@ export function jobSavedObjectServiceFactory(savedObjectsClient: SavedObjectsCli
return await _getJobObjects(jobType, undefined, undefined, currentSpaceOnly);
}
async function getAllJobObjectsForAllSpaces(jobType?: JobType) {
await isMlReady();
const filterObject: JobObjectFilter = {};
if (jobType !== undefined) {
filterObject.type = jobType;
}
const { filter, searchFields } = createSavedObjectFilter(filterObject);
const options: SavedObjectsFindOptions = {
type: ML_SAVED_OBJECT_TYPE,
perPage: 10000,
...(spacesEnabled === false ? {} : { namespaces: ['*'] }),
searchFields,
filter,
};
return (await internalSavedObjectsClient.find<JobObject>(options)).saved_objects;
}
async function addDatafeed(datafeedId: string, jobId: string) {
const jobs = await _getJobObjects('anomaly-detector', jobId);
const job = jobs[0];
@ -268,6 +294,7 @@ export function jobSavedObjectServiceFactory(savedObjectsClient: SavedObjectsCli
assignJobsToSpaces,
removeJobsFromSpaces,
bulkCreateJobs,
getAllJobObjectsForAllSpaces,
};
}

View file

@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { SavedObjectsServiceStart, KibanaRequest } from 'kibana/server';
import { SavedObjectsClient } from '../../../../../src/core/server';
import { ML_SAVED_OBJECT_TYPE } from '../../common/types/saved_objects';
export function savedObjectClientsFactory(
getSavedObjectsStart: () => SavedObjectsServiceStart | null
) {
return {
// create a saved object client scoped to the current request
// which has access to ml-job objects
getMlSavedObjectsClient: (request: KibanaRequest) => {
const savedObjectsStart = getSavedObjectsStart();
if (savedObjectsStart === null) {
return null;
}
return savedObjectsStart.getScopedClient(request, {
includedHiddenTypes: [ML_SAVED_OBJECT_TYPE],
});
},
// create a saved object client which has access to all saved objects
// no matter the space access of the current user.
getInternalSavedObjectsClient: () => {
const savedObjectsStart = getSavedObjectsStart();
if (savedObjectsStart === null) {
return null;
}
const savedObjectsRepo = savedObjectsStart.createInternalRepository();
return new SavedObjectsClient(savedObjectsRepo);
},
};
}

View file

@ -44,10 +44,7 @@ export function getMlSystemProvider(
return await getGuards(request, savedObjectsClient)
.isMinimumLicense()
.ok(async ({ mlClient }) => {
const { isMlEnabledInSpace } =
spaces !== undefined
? spacesUtilsProvider(spaces, request)
: { isMlEnabledInSpace: async () => true };
const { isMlEnabledInSpace } = spacesUtilsProvider(spaces, request);
const mlCapabilities = await resolveMlCapabilities(request);
if (mlCapabilities === null) {

View file

@ -5,13 +5,13 @@
*/
import { IClusterClient, IScopedClusterClient, SavedObjectsClientContract } from 'kibana/server';
import { SpacesPluginSetup } from '../../../spaces/server';
// including KibanaRequest from 'kibana/server' causes an error
// when being used with instanceof
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { KibanaRequest } from '../../.././../../src/core/server/http';
import { MlLicense } from '../../common/license';
import { SpacesPluginSetup } from '../../../spaces/server';
import { CloudSetup } from '../../../cloud/server';
import { licenseChecks } from './license_checks';
import { MlSystemProvider, getMlSystemProvider } from './providers/system';
@ -59,22 +59,31 @@ type OkCallback = (okParams: OkParams) => any;
export function createSharedServices(
mlLicense: MlLicense,
spaces: SpacesPluginSetup | undefined,
spacesPlugin: SpacesPluginSetup | undefined,
cloud: CloudSetup,
resolveMlCapabilities: ResolveMlCapabilities,
getClusterClient: () => IClusterClient | null
getClusterClient: () => IClusterClient | null,
getInternalSavedObjectsClient: () => SavedObjectsClientContract | null,
isMlReady: () => Promise<void>
): SharedServices {
const { isFullLicense, isMinimumLicense } = licenseChecks(mlLicense);
function getGuards(
request: KibanaRequest,
savedObjectsClient: SavedObjectsClientContract
): Guards {
const internalSavedObjectsClient = getInternalSavedObjectsClient();
if (internalSavedObjectsClient === null) {
throw new Error('Internal saved object client not initialized');
}
const getRequestItems = getRequestItemsProvider(
resolveMlCapabilities,
getClusterClient,
savedObjectsClient
savedObjectsClient,
internalSavedObjectsClient,
spacesPlugin !== undefined,
isMlReady
);
const { hasMlCapabilities, scopedClient, mlClient } = getRequestItems(request);
const asyncGuards: Array<Promise<void>> = [];
@ -104,14 +113,17 @@ export function createSharedServices(
...getAnomalyDetectorsProvider(getGuards),
...getModulesProvider(getGuards),
...getResultsServiceProvider(getGuards),
...getMlSystemProvider(getGuards, mlLicense, spaces, cloud, resolveMlCapabilities),
...getMlSystemProvider(getGuards, mlLicense, spacesPlugin, cloud, resolveMlCapabilities),
};
}
function getRequestItemsProvider(
resolveMlCapabilities: ResolveMlCapabilities,
getClusterClient: () => IClusterClient | null,
savedObjectsClient: SavedObjectsClientContract
savedObjectsClient: SavedObjectsClientContract,
internalSavedObjectsClient: SavedObjectsClientContract,
spaceEnabled: boolean,
isMlReady: () => Promise<void>
) {
return (request: KibanaRequest) => {
const getHasMlCapabilities = hasMlCapabilitiesProvider(resolveMlCapabilities);
@ -122,7 +134,12 @@ function getRequestItemsProvider(
// will not receive a real request object when being called from an alert.
// instead a dummy request object will be supplied
const clusterClient = getClusterClient();
const jobSavedObjectService = jobSavedObjectServiceFactory(savedObjectsClient);
const jobSavedObjectService = jobSavedObjectServiceFactory(
savedObjectsClient,
internalSavedObjectsClient,
spaceEnabled,
isMlReady
);
if (clusterClient === null) {
throw new MLClusterClientUninitialized(`ML's cluster client has not been initialized`);