[ML] Add data drift detection workflow from Trained models to Data comparison view (#162853)

## Summary

This PR adds data drift detection workflow from Trained models to Data
comparison view. It also renames Data comparison to Data Drift.

**From the new map view in Trained model list:**

- Clicking on the index icon in the map view will give an option/action
to Analyze data drift


a68163ab-8a83-4378-8cf3-ea49f4480a06

- If model has detected related indices, it will also give an option to
Analyze data drift in the Transform actions

**From the data comparison/drift page:**

- Default screen with list of available data views and saved search will
be shown

<img width="1470" alt="Screen Shot 2023-09-07 at 00 22 01"
src="db13b8b7-9d90-4220-b03e-9f9d12ab53e9">

- But can also customize index patterns for the data sets to analyze.
Upon 'analyzing', a new data view will be created if needed (either
permanently or temporarily).

<img width="1271" alt="Screen Shot 2023-08-29 at 16 56 57"
src="e000e920-162b-4369-8762-70b6244e50e7">

<img width="1470" alt="Screen Shot 2023-09-07 at 00 22 49"
src="6577a530-c3b0-4ab9-95e4-d1d8fd1c9f0a">

- If there exists a data view with exact combination of index patterns
and time field, it will use that data view
- If there exists a data view with the same index patterns but different
time field, it will create a new data view with name
`{referencePattern},{comparisonPattern}-{timeField}`
- If no data view exists that matches, it will create a new data view
with name `{referencePattern},{comparisonPattern}`


## For reviewers:
- **appex-sharedux**: [Small change in the exported type interface for
BaseSavedObjectFinder](https://github.com/elastic/kibana/pull/162853/files#diff-5e2e62df8aba5ac9445962bfa00eee933a386110d0a24dfe6ac0f300a796ccc3)
to correctly list `children` as an accepted prop. This prop which is
used for the `toolsRight`.
- **security-solution**: Renaming of `Data comparison` to `Data Drift`

## Tests:
[Flaky test suite runner with Data Drift
test](https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/3216#018accc2-d33b-4cd6-a178-589e6698b675)
... successful after 50 runs


### Checklist

Delete any items that are not applicable to this PR.

- [ ] Any text added [EUI's writing
guidelines](https://elastic.github.io/eui/#/guidelines/writing), uses
sentence case text and includes [i18n
support](https://github.com/elastic/kibana/blob/main/packages/kbn-i18n/README.md)
- [ ]
[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)
was added for features that require explanation or tutorials
- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [ ] Any UI touched in this PR is usable by keyboard only (learn more
about [keyboard accessibility](https://webaim.org/techniques/keyboard/))
- [ ] Any UI touched in this PR does not create any new axe failures
(run axe in browser:
[FF](https://addons.mozilla.org/en-US/firefox/addon/axe-devtools/),
[Chrome](https://chrome.google.com/webstore/detail/axe-web-accessibility-tes/lhdoppojpmngadmnindnejefpokejbdd?hl=en-US))
- [ ] If a plugin configuration key changed, check if it needs to be
allowlisted in the cloud and added to the [docker
list](https://github.com/elastic/kibana/blob/main/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker)
- [ ] This renders correctly on smaller devices using a responsive
layout. (You can test this [in your
browser](https://www.browserstack.com/guide/responsive-testing-on-local-server))
- [ ] This was checked for [cross-browser
compatibility](https://www.elastic.co/support/matrix#matrix_browsers)


### Risk Matrix

Delete this section if it is not applicable to this PR.

Before closing this PR, invite QA, stakeholders, and other developers to
identify risks that should be tested prior to the change/feature
release.

When forming the risk matrix, consider some of the following examples
and how they may potentially impact the change:

| Risk | Probability | Severity | Mitigation/Notes |

|---------------------------|-------------|----------|-------------------------|
| Multiple Spaces&mdash;unexpected behavior in non-default Kibana Space.
| Low | High | Integration tests will verify that all features are still
supported in non-default Kibana Space and when user switches between
spaces. |
| Multiple nodes&mdash;Elasticsearch polling might have race conditions
when multiple Kibana nodes are polling for the same tasks. | High | Low
| Tasks are idempotent, so executing them multiple times will not result
in logical error, but will degrade performance. To test for this case we
add plenty of unit tests around this logic and document manual testing
procedure. |
| Code should gracefully handle cases when feature X or plugin Y are
disabled. | Medium | High | Unit tests will verify that any feature flag
or plugin combination still results in our service operational. |
| [See more potential risk
examples](https://github.com/elastic/kibana/blob/main/RISK_MATRIX.mdx) |


### For maintainers

- [ ] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Quynh Nguyen (Quinn) 2023-09-26 17:15:35 -05:00 committed by GitHub
parent a338dd8b38
commit 4dfd31def0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
96 changed files with 3210 additions and 1020 deletions

View file

@ -18,9 +18,8 @@ import {
type AnalyticsMapNodeElement,
type MapElements,
} from '@kbn/ml-data-frame-analytics-utils';
import type { TransformGetTransformTransformSummary } from '@elastic/elasticsearch/lib/api/types';
import { flatten } from 'lodash';
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
import type { ModelService } from '../model_management/models_provider';
import { modelsProvider } from '../model_management';
import {
type ExtendAnalyticsMapArgs,
@ -43,13 +42,15 @@ import { DEFAULT_TRAINED_MODELS_PAGE_SIZE } from '../../routes/trained_models';
export class AnalyticsManager {
private _trainedModels: estypes.MlTrainedModelConfig[] = [];
private _jobs: estypes.MlDataframeAnalyticsSummary[] = [];
private _transforms?: TransformGetTransformTransformSummary[];
private _modelsProvider: ModelService;
constructor(
private readonly _mlClient: MlClient,
private readonly _client: IScopedClusterClient,
private readonly _enabledFeatures: MlFeatures
) {}
) {
this._modelsProvider = modelsProvider(this._client);
}
private async initData() {
const [models, jobs] = await Promise.all([
@ -64,30 +65,6 @@ export class AnalyticsManager {
this._jobs = jobs.data_frame_analytics;
}
private async initTransformData() {
if (!this._transforms) {
try {
const body = await this._client.asCurrentUser.transform.getTransform({
size: 1000,
});
this._transforms = body.transforms;
return body.transforms;
} catch (e) {
if (e.meta?.statusCode !== 403) {
// eslint-disable-next-line no-console
console.error(e);
}
}
}
}
private getNodeId(
elementOriginalId: string,
nodeType: typeof JOB_MAP_NODE_TYPES[keyof typeof JOB_MAP_NODE_TYPES]
): string {
return `${elementOriginalId}-${nodeType}`;
}
private isDuplicateElement(analyticsId: string, elements: MapElements[]): boolean {
let isDuplicate = false;
elements.forEach((elem) => {
@ -608,8 +585,12 @@ export class AnalyticsManager {
}
if (modelId && model) {
// First, find information about the trained model
result.elements.push({
const pipelinesAndIndicesResults =
await this._modelsProvider.getModelsPipelinesAndIndicesMap(modelId, {
withIndices: true,
});
// Adding information about the trained model
pipelinesAndIndicesResults.elements.push({
data: {
id: modelNodeId,
label: modelId,
@ -617,182 +598,9 @@ export class AnalyticsManager {
isRoot: true,
},
});
result.details[modelNodeId] = model;
pipelinesAndIndicesResults.details[modelNodeId] = model;
let pipelinesResponse;
let indicesSettings;
try {
// Then, find the pipelines that have the trained model set as index.default_pipelines
pipelinesResponse = await modelsProvider(this._client).getModelsPipelines([modelId]);
} catch (e) {
// Possible that the user doesn't have permissions to view ingest pipelines
// If so, gracefully exit
if (e.meta?.statusCode !== 403) {
// eslint-disable-next-line no-console
console.error(e);
}
return result;
}
const pipelines = pipelinesResponse?.get(modelId);
if (pipelines) {
const pipelineIds = new Set(Object.keys(pipelines));
for (const pipelineId of pipelineIds) {
const pipelineNodeId = `${pipelineId}-${JOB_MAP_NODE_TYPES.INGEST_PIPELINE}`;
result.details[pipelineNodeId] = pipelines[pipelineId];
result.elements.push({
data: {
id: pipelineNodeId,
label: pipelineId,
type: JOB_MAP_NODE_TYPES.INGEST_PIPELINE,
},
});
result.elements.push({
data: {
id: `${modelNodeId}~${pipelineNodeId}`,
source: modelNodeId,
target: pipelineNodeId,
},
});
}
const pipelineIdsToDestinationIndices: Record<string, string[]> = {};
let indicesPermissions;
try {
indicesSettings = await this._client.asInternalUser.indices.getSettings();
const hasPrivilegesResponse = await this._client.asCurrentUser.security.hasPrivileges({
index: [
{
names: Object.keys(indicesSettings),
privileges: ['read'],
},
],
});
indicesPermissions = hasPrivilegesResponse.index;
} catch (e) {
// Possible that the user doesn't have permissions to view
// If so, gracefully exit
if (e.meta?.statusCode !== 403) {
// eslint-disable-next-line no-console
console.error(e);
}
return result;
}
for (const [indexName, { settings }] of Object.entries(indicesSettings)) {
if (
settings?.index?.default_pipeline &&
pipelineIds.has(settings.index.default_pipeline) &&
indicesPermissions[indexName]?.read === true
) {
if (Array.isArray(pipelineIdsToDestinationIndices[settings.index.default_pipeline])) {
pipelineIdsToDestinationIndices[settings.index.default_pipeline].push(indexName);
} else {
pipelineIdsToDestinationIndices[settings.index.default_pipeline] = [indexName];
}
}
}
for (const [pipelineId, indexIds] of Object.entries(pipelineIdsToDestinationIndices)) {
const pipelineNodeId = this.getNodeId(pipelineId, JOB_MAP_NODE_TYPES.INGEST_PIPELINE);
for (const destinationIndexId of indexIds) {
const destinationIndexNodeId = this.getNodeId(
destinationIndexId,
JOB_MAP_NODE_TYPES.INDEX
);
const destinationIndexDetails = await this.getIndexData(destinationIndexId);
result.details[destinationIndexNodeId] = {
...destinationIndexDetails,
ml_inference_models: [modelId],
};
result.elements.push({
data: {
id: destinationIndexNodeId,
label: destinationIndexId,
type: JOB_MAP_NODE_TYPES.INDEX,
},
});
result.elements.push({
data: {
id: `${pipelineNodeId}~${destinationIndexNodeId}`,
source: pipelineNodeId,
target: destinationIndexNodeId,
},
});
}
}
const destinationIndices = flatten(Object.values(pipelineIdsToDestinationIndices));
// From these destination indices, see if there's any transforms that have the indexId as the source destination index
if (destinationIndices.length > 0) {
const transforms = await this.initTransformData();
if (!transforms) return result;
for (const destinationIndex of destinationIndices) {
const destinationIndexNodeId = `${destinationIndex}-${JOB_MAP_NODE_TYPES.INDEX}`;
const foundTransform = transforms?.find((t) => {
const transformSourceIndex = Array.isArray(t.source.index)
? t.source.index[0]
: t.source.index;
return transformSourceIndex === destinationIndex;
});
if (foundTransform) {
const transformDestIndex = foundTransform.dest.index;
const transformNodeId = `${foundTransform.id}-${JOB_MAP_NODE_TYPES.TRANSFORM}`;
const transformDestIndexNodeId = `${transformDestIndex}-${JOB_MAP_NODE_TYPES.INDEX}`;
const destIndex = await this.getIndexData(transformDestIndex);
result.details[transformNodeId] = foundTransform;
result.details[transformDestIndexNodeId] = destIndex;
result.elements.push(
{
data: {
id: transformNodeId,
label: foundTransform.id,
type: JOB_MAP_NODE_TYPES.TRANSFORM,
},
},
{
data: {
id: transformDestIndexNodeId,
label: transformDestIndex,
type: JOB_MAP_NODE_TYPES.INDEX,
},
}
);
result.elements.push(
{
data: {
id: `${destinationIndexNodeId}~${transformNodeId}`,
source: destinationIndexNodeId,
target: transformNodeId,
},
},
{
data: {
id: `${transformNodeId}~${transformDestIndexNodeId}`,
source: transformNodeId,
target: transformDestIndexNodeId,
},
}
);
}
}
}
}
return pipelinesAndIndicesResults;
}
} catch (error) {
result.error = error.message || 'An error occurred fetching map';

View file

@ -6,6 +6,10 @@
*/
import type { IScopedClusterClient } from '@kbn/core/server';
import { JOB_MAP_NODE_TYPES, type MapElements } from '@kbn/ml-data-frame-analytics-utils';
import { flatten } from 'lodash';
import type { TransformGetTransformTransformSummary } from '@elastic/elasticsearch/lib/api/types';
import type { IndexName, IndicesIndexState } from '@elastic/elasticsearch/lib/api/types';
import type {
IngestPipeline,
IngestSimulateDocument,
@ -21,197 +25,472 @@ import type { CloudSetup } from '@kbn/cloud-plugin/server';
import type { PipelineDefinition } from '../../../common/types/trained_models';
export type ModelService = ReturnType<typeof modelsProvider>;
export const modelsProvider = (client: IScopedClusterClient, cloud?: CloudSetup) =>
new ModelsProvider(client, cloud);
export function modelsProvider(client: IScopedClusterClient, cloud?: CloudSetup) {
return {
/**
* Retrieves the map of model ids and aliases with associated pipelines.
* @param modelIds - Array of models ids and model aliases.
*/
async getModelsPipelines(modelIds: string[]) {
const modelIdsMap = new Map<string, Record<string, PipelineDefinition> | null>(
modelIds.map((id: string) => [id, null])
);
interface ModelMapResult {
ingestPipelines: Map<string, Record<string, PipelineDefinition> | null>;
indices: Array<Record<IndexName, IndicesIndexState | null>>;
/**
* Map elements
*/
elements: MapElements[];
/**
* Transform, job or index details
*/
details: Record<string, any>;
/**
* Error
*/
error: null | any;
}
export class ModelsProvider {
private _transforms?: TransformGetTransformTransformSummary[];
constructor(private _client: IScopedClusterClient, private _cloud?: CloudSetup) {}
private async initTransformData() {
if (!this._transforms) {
try {
const body = await client.asCurrentUser.ingest.getPipeline();
const body = await this._client.asCurrentUser.transform.getTransform({
size: 1000,
});
this._transforms = body.transforms;
return body.transforms;
} catch (e) {
if (e.meta?.statusCode !== 403) {
// eslint-disable-next-line no-console
console.error(e);
}
}
}
}
for (const [pipelineName, pipelineDefinition] of Object.entries(body)) {
const { processors } = pipelineDefinition as { processors: Array<Record<string, any>> };
private async getIndexData(index: string): Promise<Record<IndexName, IndicesIndexState | null>> {
try {
const indexData = await this._client.asInternalUser.indices.get({
index,
});
return indexData;
} catch (e) {
// Possible that the user doesn't have permissions to view
// If so, gracefully exit
if (e.meta?.statusCode !== 403) {
// eslint-disable-next-line no-console
console.error(e);
}
return { [index]: null };
}
}
private getNodeId(
elementOriginalId: string,
nodeType: typeof JOB_MAP_NODE_TYPES[keyof typeof JOB_MAP_NODE_TYPES]
): string {
return `${elementOriginalId}-${nodeType}`;
}
for (const processor of processors) {
const id = processor.inference?.model_id;
if (modelIdsMap.has(id)) {
const obj = modelIdsMap.get(id);
if (obj === null) {
modelIdsMap.set(id, { [pipelineName]: pipelineDefinition });
/**
* Simulates the effect of the pipeline on given document.
*
*/
async simulatePipeline(docs: IngestSimulateDocument[], pipelineConfig: IngestPipeline) {
const simulateRequest: IngestSimulateRequest = {
docs,
pipeline: pipelineConfig,
};
let result = {};
try {
result = await this._client.asCurrentUser.ingest.simulate(simulateRequest);
} catch (error) {
if (error.statusCode === 404) {
// ES returns 404 when there are no pipelines
// Instead, we should return an empty response and a 200
return result;
}
throw error;
}
return result;
}
/**
* Creates the pipeline
*
*/
async createInferencePipeline(pipelineConfig: IngestPipeline, pipelineName: string) {
let result = {};
result = await this._client.asCurrentUser.ingest.putPipeline({
id: pipelineName,
...pipelineConfig,
});
return result;
}
/**
* Retrieves existing pipelines.
*
*/
async getPipelines() {
let result = {};
try {
result = await this._client.asCurrentUser.ingest.getPipeline();
} catch (error) {
if (error.statusCode === 404) {
// ES returns 404 when there are no pipelines
// Instead, we should return an empty response and a 200
return result;
}
throw error;
}
return result;
}
/**
* Retrieves the map of model ids and aliases with associated pipelines.
* @param modelIds - Array of models ids and model aliases.
*/
async getModelsPipelines(modelIds: string[]) {
const modelIdsMap = new Map<string, Record<string, PipelineDefinition> | null>(
modelIds.map((id: string) => [id, null])
);
try {
const body = await this._client.asCurrentUser.ingest.getPipeline();
for (const [pipelineName, pipelineDefinition] of Object.entries(body)) {
const { processors } = pipelineDefinition as { processors: Array<Record<string, any>> };
for (const processor of processors) {
const id = processor.inference?.model_id;
if (modelIdsMap.has(id)) {
const obj = modelIdsMap.get(id);
if (obj === null) {
modelIdsMap.set(id, { [pipelineName]: pipelineDefinition });
} else {
obj![pipelineName] = pipelineDefinition;
}
}
}
}
} catch (error) {
if (error.statusCode === 404) {
// ES returns 404 when there are no pipelines
// Instead, we should return the modelIdsMap and a 200
return modelIdsMap;
}
throw error;
}
return modelIdsMap;
}
/**
* Retrieves the network map and metadata of model ids, pipelines, and indices that are tied to the model ids.
* @param modelIds - Array of models ids and model aliases.
*/
async getModelsPipelinesAndIndicesMap(
modelId: string,
{
withIndices,
}: {
withIndices: boolean;
}
): Promise<ModelMapResult> {
const result: ModelMapResult = {
ingestPipelines: new Map(),
indices: [],
elements: [],
details: {},
error: null,
};
let pipelinesResponse;
let indicesSettings;
try {
pipelinesResponse = await this.getModelsPipelines([modelId]);
// 1. Get list of pipelines that are related to the model
const pipelines = pipelinesResponse?.get(modelId);
const modelNodeId = this.getNodeId(modelId, JOB_MAP_NODE_TYPES.TRAINED_MODEL);
if (pipelines) {
const pipelineIds = new Set(Object.keys(pipelines));
result.ingestPipelines = pipelinesResponse;
for (const pipelineId of pipelineIds) {
const pipelineNodeId = this.getNodeId(pipelineId, JOB_MAP_NODE_TYPES.INGEST_PIPELINE);
result.details[pipelineNodeId] = pipelines[pipelineId];
result.elements.push({
data: {
id: pipelineNodeId,
label: pipelineId,
type: JOB_MAP_NODE_TYPES.INGEST_PIPELINE,
},
});
result.elements.push({
data: {
id: `${modelNodeId}~${pipelineNodeId}`,
source: modelNodeId,
target: pipelineNodeId,
},
});
}
if (withIndices === true) {
const pipelineIdsToDestinationIndices: Record<string, string[]> = {};
let indicesPermissions;
try {
indicesSettings = await this._client.asInternalUser.indices.getSettings();
const hasPrivilegesResponse = await this._client.asCurrentUser.security.hasPrivileges({
index: [
{
names: Object.keys(indicesSettings),
privileges: ['read'],
},
],
});
indicesPermissions = hasPrivilegesResponse.index;
} catch (e) {
// Possible that the user doesn't have permissions to view
// If so, gracefully exit
if (e.meta?.statusCode !== 403) {
// eslint-disable-next-line no-console
console.error(e);
}
return result;
}
// 2. From list of model pipelines, find all indices that have pipeline set as index.default_pipeline
for (const [indexName, { settings }] of Object.entries(indicesSettings)) {
if (
settings?.index?.default_pipeline &&
pipelineIds.has(settings.index.default_pipeline) &&
indicesPermissions[indexName]?.read === true
) {
if (Array.isArray(pipelineIdsToDestinationIndices[settings.index.default_pipeline])) {
pipelineIdsToDestinationIndices[settings.index.default_pipeline].push(indexName);
} else {
obj![pipelineName] = pipelineDefinition;
pipelineIdsToDestinationIndices[settings.index.default_pipeline] = [indexName];
}
}
}
// 3. Grab index information for all the indices found, and add their info to the map
for (const [pipelineId, indexIds] of Object.entries(pipelineIdsToDestinationIndices)) {
const pipelineNodeId = this.getNodeId(pipelineId, JOB_MAP_NODE_TYPES.INGEST_PIPELINE);
for (const destinationIndexId of indexIds) {
const destinationIndexNodeId = this.getNodeId(
destinationIndexId,
JOB_MAP_NODE_TYPES.INDEX
);
const destinationIndexDetails = await this.getIndexData(destinationIndexId);
result.indices.push(destinationIndexDetails);
result.details[destinationIndexNodeId] = {
...destinationIndexDetails,
ml_inference_models: [modelId],
};
result.elements.push({
data: {
id: destinationIndexNodeId,
label: destinationIndexId,
type: JOB_MAP_NODE_TYPES.INDEX,
},
});
result.elements.push({
data: {
id: `${pipelineNodeId}~${destinationIndexNodeId}`,
source: pipelineNodeId,
target: destinationIndexNodeId,
},
});
}
}
const destinationIndices = flatten(Object.values(pipelineIdsToDestinationIndices));
// 4. From these destination indices, check if there's any transforms that have the indexId as the source destination index
if (destinationIndices.length > 0) {
const transforms = await this.initTransformData();
if (!transforms) return result;
for (const destinationIndex of destinationIndices) {
const destinationIndexNodeId = `${destinationIndex}-${JOB_MAP_NODE_TYPES.INDEX}`;
const foundTransform = transforms?.find((t) => {
const transformSourceIndex = Array.isArray(t.source.index)
? t.source.index[0]
: t.source.index;
return transformSourceIndex === destinationIndex;
});
// 5. If any of the transforms use these indices as source , find the destination indices to complete the map
if (foundTransform) {
const transformDestIndex = foundTransform.dest.index;
const transformNodeId = `${foundTransform.id}-${JOB_MAP_NODE_TYPES.TRANSFORM}`;
const transformDestIndexNodeId = `${transformDestIndex}-${JOB_MAP_NODE_TYPES.INDEX}`;
const destIndex = await this.getIndexData(transformDestIndex);
result.indices.push(destIndex);
result.details[transformNodeId] = foundTransform;
result.details[transformDestIndexNodeId] = destIndex;
result.elements.push(
{
data: {
id: transformNodeId,
label: foundTransform.id,
type: JOB_MAP_NODE_TYPES.TRANSFORM,
},
},
{
data: {
id: transformDestIndexNodeId,
label: transformDestIndex,
type: JOB_MAP_NODE_TYPES.INDEX,
},
}
);
result.elements.push(
{
data: {
id: `${destinationIndexNodeId}~${transformNodeId}`,
source: destinationIndexNodeId,
target: transformNodeId,
},
},
{
data: {
id: `${transformNodeId}~${transformDestIndexNodeId}`,
source: transformNodeId,
target: transformDestIndexNodeId,
},
}
);
}
}
}
}
} catch (error) {
if (error.statusCode === 404) {
// ES returns 404 when there are no pipelines
// Instead, we should return the modelIdsMap and a 200
return modelIdsMap;
}
throw error;
}
return modelIdsMap;
},
/**
* Deletes associated pipelines of the requested model
* @param modelIds
*/
async deleteModelPipelines(modelIds: string[]) {
const pipelines = await this.getModelsPipelines(modelIds);
const pipelinesIds: string[] = [
...new Set([...pipelines.values()].flatMap((v) => Object.keys(v!))),
];
await Promise.all(
pipelinesIds.map((id) => client.asCurrentUser.ingest.deletePipeline({ id }))
);
},
/**
* Simulates the effect of the pipeline on given document.
*
*/
async simulatePipeline(docs: IngestSimulateDocument[], pipelineConfig: IngestPipeline) {
const simulateRequest: IngestSimulateRequest = {
docs,
pipeline: pipelineConfig,
};
let result = {};
try {
result = await client.asCurrentUser.ingest.simulate(simulateRequest);
} catch (error) {
if (error.statusCode === 404) {
// ES returns 404 when there are no pipelines
// Instead, we should return an empty response and a 200
return result;
}
throw error;
}
return result;
},
} catch (error) {
if (error.statusCode === 404) {
// ES returns 404 when there are no pipelines
// Instead, we should return the modelIdsMap and a 200
return result;
}
throw error;
}
/**
* Creates the pipeline
*
*/
async createInferencePipeline(pipelineConfig: IngestPipeline, pipelineName: string) {
let result = {};
return result;
}
result = await client.asCurrentUser.ingest.putPipeline({
id: pipelineName,
...pipelineConfig,
/**
* Deletes associated pipelines of the requested model
* @param modelIds
*/
async deleteModelPipelines(modelIds: string[]) {
const pipelines = await this.getModelsPipelines(modelIds);
const pipelinesIds: string[] = [
...new Set([...pipelines.values()].flatMap((v) => Object.keys(v!))),
];
await Promise.all(
pipelinesIds.map((id) => this._client.asCurrentUser.ingest.deletePipeline({ id }))
);
}
/**
* Returns a list of elastic curated models available for download.
*/
async getModelDownloads(): Promise<ModelDefinitionResponse[]> {
// We assume that ML nodes in Cloud are always on linux-x86_64, even if other node types aren't.
const isCloud = !!this._cloud?.cloudId;
const nodesInfoResponse =
await this._client.asInternalUser.transport.request<NodesInfoResponseBase>({
method: 'GET',
path: `/_nodes/ml:true/os`,
});
return result;
},
/**
* Retrieves existing pipelines.
*
*/
async getPipelines() {
let result = {};
try {
result = await client.asCurrentUser.ingest.getPipeline();
} catch (error) {
if (error.statusCode === 404) {
// ES returns 404 when there are no pipelines
// Instead, we should return an empty response and a 200
return result;
}
throw error;
let osName: string | undefined;
let arch: string | undefined;
// Indicates that all ML nodes have the same architecture
let sameArch = true;
for (const node of Object.values(nodesInfoResponse.nodes)) {
if (!osName) {
osName = node.os?.name;
}
if (!arch) {
arch = node.os?.arch;
}
if (node.os?.name !== osName || node.os?.arch !== arch) {
sameArch = false;
break;
}
}
return result;
},
const result = Object.entries(ELASTIC_MODEL_DEFINITIONS).map(([name, def]) => {
const recommended =
(isCloud && def.os === 'Linux' && def.arch === 'amd64') ||
(sameArch && !!def?.os && def?.os === osName && def?.arch === arch);
return {
...def,
name,
...(recommended ? { recommended } : {}),
};
});
/**
* Returns a list of elastic curated models available for download.
*/
async getModelDownloads(): Promise<ModelDefinitionResponse[]> {
// We assume that ML nodes in Cloud are always on linux-x86_64, even if other node types aren't.
const isCloud = !!cloud?.cloudId;
return result;
}
const nodesInfoResponse =
await client.asInternalUser.transport.request<NodesInfoResponseBase>({
method: 'GET',
path: `/_nodes/ml:true/os`,
});
/**
* Provides an ELSER model name and configuration for download based on the current cluster architecture.
* The current default version is 2. If running on Cloud it returns the Linux x86_64 optimized version.
* If any of the ML nodes run a different OS rather than Linux, or the CPU architecture isn't x86_64,
* a portable version of the model is returned.
*/
async getELSER(options?: GetElserOptions): Promise<ModelDefinitionResponse> | never {
const modelDownloadConfig = await this.getModelDownloads();
let osName: string | undefined;
let arch: string | undefined;
// Indicates that all ML nodes have the same architecture
let sameArch = true;
for (const node of Object.values(nodesInfoResponse.nodes)) {
if (!osName) {
osName = node.os?.name;
}
if (!arch) {
arch = node.os?.arch;
}
if (node.os?.name !== osName || node.os?.arch !== arch) {
sameArch = false;
let requestedModel: ModelDefinitionResponse | undefined;
let recommendedModel: ModelDefinitionResponse | undefined;
let defaultModel: ModelDefinitionResponse | undefined;
for (const model of modelDownloadConfig) {
if (options?.version === model.version) {
requestedModel = model;
if (model.recommended) {
requestedModel = model;
break;
}
} else if (model.recommended) {
recommendedModel = model;
} else if (model.default) {
defaultModel = model;
}
}
const result = Object.entries(ELASTIC_MODEL_DEFINITIONS).map(([name, def]) => {
const recommended =
(isCloud && def.os === 'Linux' && def.arch === 'amd64') ||
(sameArch && !!def?.os && def?.os === osName && def?.arch === arch);
return {
...def,
name,
...(recommended ? { recommended } : {}),
};
});
if (!requestedModel && !defaultModel && !recommendedModel) {
throw new Error('Requested model not found');
}
return result;
},
/**
* Provides an ELSER model name and configuration for download based on the current cluster architecture.
* The current default version is 2. If running on Cloud it returns the Linux x86_64 optimized version.
* If any of the ML nodes run a different OS rather than Linux, or the CPU architecture isn't x86_64,
* a portable version of the model is returned.
*/
async getELSER(options?: GetElserOptions): Promise<ModelDefinitionResponse> | never {
const modelDownloadConfig = await this.getModelDownloads();
let requestedModel: ModelDefinitionResponse | undefined;
let recommendedModel: ModelDefinitionResponse | undefined;
let defaultModel: ModelDefinitionResponse | undefined;
for (const model of modelDownloadConfig) {
if (options?.version === model.version) {
requestedModel = model;
if (model.recommended) {
requestedModel = model;
break;
}
} else if (model.recommended) {
recommendedModel = model;
} else if (model.default) {
defaultModel = model;
}
}
if (!requestedModel && !defaultModel && !recommendedModel) {
throw new Error('Requested model not found');
}
return requestedModel || recommendedModel || defaultModel!;
},
};
return requestedModel || recommendedModel || defaultModel!;
}
}