[FLEET] Adding support for installing ML models (#107710)

* adds support for saved object based ml models

* adds es asset type and ml model install handler

* wip: handle top level pipeline install

* remove unnecessary mlModel savedObject type

* add package manifest license check

* get modelid from model path

* add fleet api test for ml model

* replace test mlModel for api test with smaller test model

* cleanup install/remove and ensure pipelines are retained when upgrading

* fix types - update test model id

* fix types

* remove hard coded ml category and check top level pipeline on upgrade

* update ml model test file

* ensure deduplicated asset refs are saved

* Fix api integration update test

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Nicolas Chaulet <nicolas.chaulet@elastic.co>
This commit is contained in:
Melissa Alvarez 2021-10-15 13:06:57 -04:00 committed by GitHub
parent 5fcc118913
commit c240ccff86
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 523 additions and 49 deletions

View file

@ -7,7 +7,7 @@
import type { Observable, Subscription } from 'rxjs';
import type { ILicense } from '../../../licensing/common/types';
import type { ILicense, LicenseType } from '../../../licensing/common/types';
// Generic license service class that works with the license observable
// Both server and client plugins instancates a singleton version of this class
@ -53,4 +53,11 @@ export class LicenseService {
this.licenseInformation?.hasAtLeast('enterprise')
);
}
public hasAtLeast(licenseType: LicenseType) {
return (
this.licenseInformation?.isAvailable &&
this.licenseInformation?.isActive &&
this.licenseInformation?.hasAtLeast(licenseType)
);
}
}

View file

@ -42,6 +42,7 @@ describe('Fleet - packageToPackagePolicy', () => {
transform: [],
ilm_policy: [],
data_stream_ilm_policy: [],
ml_model: [],
},
},
status: 'not_installed',

View file

@ -94,6 +94,7 @@ export enum ElasticsearchAssetType {
ilmPolicy = 'ilm_policy',
transform = 'transform',
dataStreamIlmPolicy = 'data_stream_ilm_policy',
mlModel = 'ml_model',
}
export type DataType = typeof dataTypes;

View file

@ -41,11 +41,11 @@ export const AssetsFacetGroup = ({ width }: Args) => {
elasticsearch: {
component_template: [],
data_stream_ilm_policy: [],
data_stream: [],
ilm_policy: [],
index_template: [],
ingest_pipeline: [],
transform: [],
ml_model: [],
},
}}
/>

View file

@ -65,6 +65,9 @@ export const AssetTitleMap: Record<DisplayedAssetType, string> = {
ml_module: i18n.translate('xpack.fleet.epm.assetTitles.mlModules', {
defaultMessage: 'ML modules',
}),
ml_model: i18n.translate('xpack.fleet.epm.assetTitles.mlModels', {
defaultMessage: 'ML models',
}),
view: i18n.translate('xpack.fleet.epm.assetTitles.views', {
defaultMessage: 'Views',
}),

View file

@ -5,6 +5,6 @@
* 2.0.
*/
export { installPipelines } from './install';
export { installPipelines, isTopLevelPipeline } from './install';
export { deletePreviousPipelines, deletePipeline } from './remove';

View file

@ -28,6 +28,13 @@ interface RewriteSubstitution {
templateFunction: string;
}
export const isTopLevelPipeline = (path: string) => {
const pathParts = getPathParts(path);
return (
pathParts.type === ElasticsearchAssetType.ingestPipeline && pathParts.dataset === undefined
);
};
export const installPipelines = async (
installablePackage: InstallablePackage,
paths: string[],
@ -39,25 +46,41 @@ export const installPipelines = async (
// so do not remove the currently installed pipelines here
const dataStreams = installablePackage.data_streams;
const { name: pkgName, version: pkgVersion } = installablePackage;
if (!dataStreams?.length) return [];
const pipelinePaths = paths.filter((path) => isPipeline(path));
const topLevelPipelinePaths = paths.filter((path) => isTopLevelPipeline(path));
if (!dataStreams?.length && topLevelPipelinePaths.length === 0) return [];
// get and save pipeline refs before installing pipelines
const pipelineRefs = dataStreams.reduce<EsAssetReference[]>((acc, dataStream) => {
const filteredPaths = pipelinePaths.filter((path) =>
isDataStreamPipeline(path, dataStream.path)
);
const pipelineObjectRefs = filteredPaths.map((path) => {
const { name } = getNameAndExtension(path);
const nameForInstallation = getPipelineNameForInstallation({
pipelineName: name,
dataStream,
packageVersion: installablePackage.version,
});
return { id: nameForInstallation, type: ElasticsearchAssetType.ingestPipeline };
let pipelineRefs = dataStreams
? dataStreams.reduce<EsAssetReference[]>((acc, dataStream) => {
const filteredPaths = pipelinePaths.filter((path) =>
isDataStreamPipeline(path, dataStream.path)
);
const pipelineObjectRefs = filteredPaths.map((path) => {
const { name } = getNameAndExtension(path);
const nameForInstallation = getPipelineNameForInstallation({
pipelineName: name,
dataStream,
packageVersion: installablePackage.version,
});
return { id: nameForInstallation, type: ElasticsearchAssetType.ingestPipeline };
});
acc.push(...pipelineObjectRefs);
return acc;
}, [])
: [];
const topLevelPipelineRefs = topLevelPipelinePaths.map((path) => {
const { name } = getNameAndExtension(path);
const nameForInstallation = getPipelineNameForInstallation({
pipelineName: name,
packageVersion: installablePackage.version,
});
acc.push(...pipelineObjectRefs);
return acc;
}, []);
return { id: nameForInstallation, type: ElasticsearchAssetType.ingestPipeline };
});
pipelineRefs = [...pipelineRefs, ...topLevelPipelineRefs];
// check that we don't duplicate the pipeline refs if the user is reinstalling
const installedPkg = await getInstallationObject({
@ -73,19 +96,33 @@ export const installPipelines = async (
pkgVersion
);
await saveInstalledEsRefs(savedObjectsClient, installablePackage.name, pipelineRefs);
const pipelines = dataStreams.reduce<Array<Promise<EsAssetReference[]>>>((acc, dataStream) => {
if (dataStream.ingest_pipeline) {
acc.push(
installPipelinesForDataStream({
dataStream,
esClient,
paths: pipelinePaths,
pkgVersion: installablePackage.version,
})
);
}
return acc;
}, []);
const pipelines = dataStreams
? dataStreams.reduce<Array<Promise<EsAssetReference[]>>>((acc, dataStream) => {
if (dataStream.ingest_pipeline) {
acc.push(
installAllPipelines({
dataStream,
esClient,
paths: pipelinePaths,
pkgVersion: installablePackage.version,
})
);
}
return acc;
}, [])
: [];
if (topLevelPipelinePaths) {
pipelines.push(
installAllPipelines({
dataStream: undefined,
esClient,
paths: topLevelPipelinePaths,
pkgVersion: installablePackage.version,
})
);
}
return await Promise.all(pipelines).then((results) => results.flat());
};
@ -110,7 +147,7 @@ export function rewriteIngestPipeline(
return pipeline;
}
export async function installPipelinesForDataStream({
export async function installAllPipelines({
esClient,
pkgVersion,
paths,
@ -119,9 +156,11 @@ export async function installPipelinesForDataStream({
esClient: ElasticsearchClient;
pkgVersion: string;
paths: string[];
dataStream: RegistryDataStream;
dataStream?: RegistryDataStream;
}): Promise<EsAssetReference[]> {
const pipelinePaths = paths.filter((path) => isDataStreamPipeline(path, dataStream.path));
const pipelinePaths = dataStream
? paths.filter((path) => isDataStreamPipeline(path, dataStream.path))
: paths;
let pipelines: any[] = [];
const substitutions: RewriteSubstitution[] = [];
@ -256,11 +295,15 @@ export const getPipelineNameForInstallation = ({
packageVersion,
}: {
pipelineName: string;
dataStream: RegistryDataStream;
dataStream?: RegistryDataStream;
packageVersion: string;
}): string => {
const isPipelineEntry = pipelineName === dataStream.ingest_pipeline;
const suffix = isPipelineEntry ? '' : `-${pipelineName}`;
// if this is the pipeline entry, don't add a suffix
return `${dataStream.type}-${dataStream.dataset}-${packageVersion}${suffix}`;
if (dataStream !== undefined) {
const isPipelineEntry = pipelineName === dataStream.ingest_pipeline;
const suffix = isPipelineEntry ? '' : `-${pipelineName}`;
// if this is the pipeline entry, don't add a suffix
return `${dataStream.type}-${dataStream.dataset}-${packageVersion}${suffix}`;
}
// It's a top-level pipeline
return `${packageVersion}-${pipelineName}`;
};

View file

@ -0,0 +1,8 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { getAsset } from '../../archive';

View file

@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { installMlModel } from './install';
export { deleteMlModel } from './remove';

View file

@ -0,0 +1,87 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient, SavedObjectsClientContract } from 'kibana/server';
import { ResponseError } from '@elastic/elasticsearch/lib/errors';
import { saveInstalledEsRefs } from '../../packages/install';
import { getPathParts } from '../../archive';
import { ElasticsearchAssetType } from '../../../../../common/types/models';
import type { EsAssetReference, InstallablePackage } from '../../../../../common/types/models';
import { getAsset } from './common';
interface MlModelInstallation {
installationName: string;
content: string;
}
export const installMlModel = async (
installablePackage: InstallablePackage,
paths: string[],
esClient: ElasticsearchClient,
savedObjectsClient: SavedObjectsClientContract
) => {
const mlModelPath = paths.find((path) => isMlModel(path));
const installedMlModels: EsAssetReference[] = [];
if (mlModelPath !== undefined) {
const content = getAsset(mlModelPath).toString('utf-8');
const pathParts = mlModelPath.split('/');
const modelId = pathParts[pathParts.length - 1].replace('.json', '');
const mlModelRef = {
id: modelId,
type: ElasticsearchAssetType.mlModel,
};
// get and save ml model refs before installing ml model
await saveInstalledEsRefs(savedObjectsClient, installablePackage.name, [mlModelRef]);
const mlModel: MlModelInstallation = {
installationName: modelId,
content,
};
const result = await handleMlModelInstall({ esClient, mlModel });
installedMlModels.push(result);
}
return installedMlModels;
};
const isMlModel = (path: string) => {
const pathParts = getPathParts(path);
return !path.endsWith('/') && pathParts.type === ElasticsearchAssetType.mlModel;
};
async function handleMlModelInstall({
esClient,
mlModel,
}: {
esClient: ElasticsearchClient;
mlModel: MlModelInstallation;
}): Promise<EsAssetReference> {
try {
await esClient.ml.putTrainedModel({
model_id: mlModel.installationName,
defer_definition_decompression: true,
timeout: '45s',
body: mlModel.content,
});
} catch (err) {
// swallow the error if the ml model already exists.
const isAlreadyExistError =
err instanceof ResponseError &&
err?.body?.error?.type === 'resource_already_exists_exception';
if (!isAlreadyExistError) {
throw err;
}
}
return { id: mlModel.installationName, type: ElasticsearchAssetType.mlModel };
}

View file

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient } from 'kibana/server';
import { appContextService } from '../../../app_context';
export const deleteMlModel = async (esClient: ElasticsearchClient, mlModelIds: string[]) => {
const logger = appContextService.getLogger();
if (mlModelIds.length) {
logger.info(`Deleting currently installed ml model ids ${mlModelIds}`);
}
await Promise.all(
mlModelIds.map(async (modelId) => {
await esClient.ml.deleteTrainedModel({ model_id: modelId }, { ignore: [404] });
logger.info(`Deleted: ${modelId}`);
})
);
};

View file

@ -17,12 +17,17 @@ import type { InstallablePackage, InstallSource, PackageAssetReference } from '.
import { PACKAGES_SAVED_OBJECT_TYPE } from '../../../constants';
import type { AssetReference, Installation, InstallType } from '../../../types';
import { installTemplates } from '../elasticsearch/template/install';
import { installPipelines, deletePreviousPipelines } from '../elasticsearch/ingest_pipeline/';
import {
installPipelines,
isTopLevelPipeline,
deletePreviousPipelines,
} from '../elasticsearch/ingest_pipeline/';
import { getAllTemplateRefs } from '../elasticsearch/template/install';
import { installILMPolicy } from '../elasticsearch/ilm/install';
import { installKibanaAssets, getKibanaAssets } from '../kibana/assets/install';
import { updateCurrentWriteIndices } from '../elasticsearch/template/template';
import { installTransform } from '../elasticsearch/transform/install';
import { installMlModel } from '../elasticsearch/ml_model/';
import { installIlmForDataStream } from '../elasticsearch/datastream_ilm/install';
import { saveArchiveEntries } from '../archive/storage';
import { ConcurrentInstallOperationError } from '../../../errors';
@ -54,6 +59,7 @@ export async function _installPackage({
installSource: InstallSource;
}): Promise<AssetReference[]> {
const { name: pkgName, version: pkgVersion } = packageInfo;
try {
// if some installation already exists
if (installedPkg) {
@ -134,6 +140,9 @@ export async function _installPackage({
savedObjectsClient
);
// installs ml models
const installedMlModel = await installMlModel(packageInfo, paths, esClient, savedObjectsClient);
// installs versionized pipelines without removing currently installed ones
const installedPipelines = await installPipelines(
packageInfo,
@ -159,8 +168,14 @@ export async function _installPackage({
savedObjectsClient
);
// if this is an update or retrying an update, delete the previous version's pipelines
if ((installType === 'update' || installType === 'reupdate') && installedPkg) {
// If this is an update or retrying an update, delete the previous version's pipelines
// Top-level pipeline assets will not be removed on upgrade as of ml model package addition which requires previous
// assets to remain installed. This is a temporary solution - more robust solution tracked here https://github.com/elastic/kibana/issues/115035
if (
paths.filter((path) => isTopLevelPipeline(path)).length === 0 &&
(installType === 'update' || installType === 'reupdate') &&
installedPkg
) {
await deletePreviousPipelines(
esClient,
savedObjectsClient,
@ -227,6 +242,7 @@ export async function _installPackage({
...installedDataStreamIlm,
...installedTemplateRefs,
...installedTransforms,
...installedMlModel,
];
} catch (err) {
if (savedObjectsClient.errors.isConflictError(err)) {

View file

@ -25,6 +25,7 @@ import {
} from '../../../errors';
import { PACKAGES_SAVED_OBJECT_TYPE, MAX_TIME_COMPLETE_INSTALL } from '../../../constants';
import type { KibanaAssetType } from '../../../types';
import { licenseService } from '../../';
import type {
Installation,
AssetType,
@ -264,6 +265,10 @@ async function installPackageFromRegistry({
// get package info
const { paths, packageInfo } = await Registry.getRegistryPackage(pkgName, pkgVersion);
if (!licenseService.hasAtLeast(packageInfo.license || 'basic')) {
return { error: new Error(`Requires ${packageInfo.license} license`), installType };
}
// try installing the package, if there was an error, call error handler and rethrow
// @ts-expect-error status is string instead of InstallResult.status 'installed' | 'already_installed'
return _installPackage({
@ -506,8 +511,19 @@ export const saveInstalledEsRefs = async (
) => {
const installedPkg = await getInstallationObject({ savedObjectsClient, pkgName });
const installedAssetsToSave = installedPkg?.attributes.installed_es.concat(installedAssets);
const deduplicatedAssets =
installedAssetsToSave?.reduce((acc, currentAsset) => {
const foundAsset = acc.find((asset: EsAssetReference) => asset.id === currentAsset.id);
if (!foundAsset) {
return acc.concat([currentAsset]);
} else {
return acc;
}
}, [] as EsAssetReference[]) || [];
await savedObjectsClient.update(PACKAGES_SAVED_OBJECT_TYPE, pkgName, {
installed_es: installedAssetsToSave,
installed_es: deduplicatedAssets,
});
return installedAssets;
};

View file

@ -20,6 +20,7 @@ import type {
import { deletePipeline } from '../elasticsearch/ingest_pipeline/';
import { installIndexPatterns } from '../kibana/index_pattern/install';
import { deleteTransforms } from '../elasticsearch/transform/remove';
import { deleteMlModel } from '../elasticsearch/ml_model';
import { packagePolicyService, appContextService } from '../..';
import { splitPkgKey } from '../registry';
import { deletePackageCache } from '../archive';
@ -105,6 +106,8 @@ function deleteESAssets(
return deleteTransforms(esClient, [id]);
} else if (assetType === ElasticsearchAssetType.dataStreamIlmPolicy) {
return deleteIlms(esClient, [id]);
} else if (assetType === ElasticsearchAssetType.mlModel) {
return deleteMlModel(esClient, [id]);
}
});
}
@ -117,11 +120,15 @@ async function deleteAssets(
const logger = appContextService.getLogger();
// must delete index templates first, or component templates which reference them cannot be deleted
// separate the assets into Index Templates and other assets
// must delete ingestPipelines first, or ml models referenced in them cannot be deleted.
// separate the assets into Index Templates and other assets.
type Tuple = [EsAssetReference[], EsAssetReference[]];
const [indexTemplates, otherAssets] = installedEs.reduce<Tuple>(
const [indexTemplatesAndPipelines, otherAssets] = installedEs.reduce<Tuple>(
([indexAssetTypes, otherAssetTypes], asset) => {
if (asset.type === ElasticsearchAssetType.indexTemplate) {
if (
asset.type === ElasticsearchAssetType.indexTemplate ||
asset.type === ElasticsearchAssetType.ingestPipeline
) {
indexAssetTypes.push(asset);
} else {
otherAssetTypes.push(asset);
@ -133,8 +140,8 @@ async function deleteAssets(
);
try {
// must delete index templates first
await Promise.all(deleteESAssets(indexTemplates, esClient));
// must delete index templates and pipelines first
await Promise.all(deleteESAssets(indexTemplatesAndPipelines, esClient));
// then the other asset types
await Promise.all([
...deleteESAssets(otherAssets, esClient),

View file

@ -106,6 +106,7 @@ describe('storedPackagePoliciesToAgentPermissions()', () => {
transform: [],
index_template: [],
data_stream_ilm_policy: [],
ml_model: [],
},
},
data_streams: [
@ -217,6 +218,7 @@ describe('storedPackagePoliciesToAgentPermissions()', () => {
transform: [],
index_template: [],
data_stream_ilm_policy: [],
ml_model: [],
},
},
data_streams: [
@ -334,6 +336,7 @@ describe('storedPackagePoliciesToAgentPermissions()', () => {
transform: [],
index_template: [],
data_stream_ilm_policy: [],
ml_model: [],
},
},
});

View file

@ -295,6 +295,7 @@ export const response: GetInfoResponse['response'] = {
ilm_policy: [],
index_template: [],
transform: [],
ml_model: [],
},
},
policy_templates: [

View file

@ -124,6 +124,7 @@ export const response: GetInfoResponse['response'] = {
ilm_policy: [],
index_template: [],
transform: [],
ml_model: [],
},
},
policy_templates: [

View file

@ -155,6 +155,43 @@ export default function (providerContext: FtrProviderContext) {
);
expect(resPipeline2.statusCode).equal(404);
});
it('should have uninstalled the ml model', async function () {
const res = await es.transport.request(
{
method: 'GET',
path: `/_ml/trained_models/default`,
},
{
ignore: [404],
}
);
expect(res.statusCode).equal(404);
});
it('should have uninstalled the transforms', async function () {
const res = await es.transport.request(
{
method: 'GET',
path: `/_transform/${pkgName}-test-default-${pkgVersion}`,
},
{
ignore: [404],
}
);
expect(res.statusCode).equal(404);
});
it('should have deleted the index for the transform', async function () {
// the index is defined in the transform file
const res = await es.transport.request(
{
method: 'GET',
path: `/logs-all_assets.test_log_current_default`,
},
{
ignore: [404],
}
);
expect(res.statusCode).equal(404);
});
it('should have uninstalled the kibana assets', async function () {
let resDashboard;
try {
@ -338,6 +375,13 @@ const expectAssetsInstalled = ({
});
expect(resPipeline2.statusCode).equal(200);
});
it('should have installed the ml model', async function () {
const res = await es.transport.request({
method: 'GET',
path: `_ml/trained_models/default`,
});
expect(res.statusCode).equal(200);
});
it('should have installed the component templates', async function () {
const resMappings = await es.transport.request({
method: 'GET',
@ -545,6 +589,10 @@ const expectAssetsInstalled = ({
id: 'logs-all_assets.test_logs-0.1.0-pipeline2',
type: 'ingest_pipeline',
},
{
id: 'default',
type: 'ml_model',
},
],
es_index_patterns: {
test_logs: 'logs-all_assets.test_logs-*',
@ -563,6 +611,7 @@ const expectAssetsInstalled = ({
{ id: 'f839c76e-d194-555a-90a1-3265a45789e4', type: 'epm-packages-assets' },
{ id: '9af7bbb3-7d8a-50fa-acc9-9dde6f5efca2', type: 'epm-packages-assets' },
{ id: '1e97a20f-9d1c-529b-8ff2-da4e8ba8bb71', type: 'epm-packages-assets' },
{ id: 'ed5d54d5-2516-5d49-9e61-9508b0152d2b', type: 'epm-packages-assets' },
{ id: 'bd5ff3c5-655e-5385-9918-b60ff3040aad', type: 'epm-packages-assets' },
{ id: '0954ce3b-3165-5c1f-a4c0-56eb5f2fa487', type: 'epm-packages-assets' },
{ id: '60d6d054-57e4-590f-a580-52bf3f5e7cca', type: 'epm-packages-assets' },

View file

@ -349,6 +349,10 @@ export default function (providerContext: FtrProviderContext) {
id: 'logs-all_assets.test_logs-all_assets',
type: 'data_stream_ilm_policy',
},
{
id: 'default',
type: 'ml_model',
},
{
id: 'logs-all_assets.test_logs-0.2.0',
type: 'ingest_pipeline',
@ -416,6 +420,7 @@ export default function (providerContext: FtrProviderContext) {
{ id: '28523a82-1328-578d-84cb-800970560200', type: 'epm-packages-assets' },
{ id: 'cc1e3e1d-f27b-5d05-86f6-6e4b9a47c7dc', type: 'epm-packages-assets' },
{ id: '5c3aa147-089c-5084-beca-53c00e72ac80', type: 'epm-packages-assets' },
{ id: '0c8c3c6a-90cb-5f0e-8359-d807785b046c', type: 'epm-packages-assets' },
{ id: '48e582df-b1d2-5f88-b6ea-ba1fafd3a569', type: 'epm-packages-assets' },
{ id: 'bf3b0b65-9fdc-53c6-a9ca-e76140e56490', type: 'epm-packages-assets' },
{ id: '7f4c5aca-b4f5-5f0a-95af-051da37513fc', type: 'epm-packages-assets' },