[Enterprise Search] Remove processor from ML Inference Pipeline (#140560)

* Started on deleting ml inference pipelines.

* Add more tests and handle edge cases.

* Fixed comment.

* Hopefully correctly mock error being raised by Elasticsearch.

* Don't update pipeline if nothing changed.

* Use underscore in route path, to aligh with other routes.

* Update x-pack/plugins/enterprise_search/server/lib/indices/delete_ml_inference_pipeline.ts

Co-authored-by: Rodney Norris <rodney@tattdcodemonkey.com>

* Make sure id is included when updating pipeline.

* Reword.

* Attempt at testing the route - fail.

* Also test pipeline missing case.

* Fix test.

Co-authored-by: Rodney Norris <rodney@tattdcodemonkey.com>
This commit is contained in:
Irina Truong 2022-09-19 02:16:42 -07:00 committed by GitHub
parent e90ab44dc1
commit 1d2aa8a531
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 315 additions and 2 deletions

View file

@ -0,0 +1,118 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { errors } from '@elastic/elasticsearch';
import { ElasticsearchClient } from '@kbn/core/server';
import { deleteMlInferencePipeline } from './delete_ml_inference_pipeline';
describe('deleteMlInferencePipeline lib function', () => {
const mockClient = {
ingest: {
deletePipeline: jest.fn(),
getPipeline: jest.fn(),
putPipeline: jest.fn(),
},
};
beforeEach(() => {
jest.clearAllMocks();
});
const anyObject: any = {};
const notFoundResponse = { meta: { statusCode: 404 } };
const notFoundError = new errors.ResponseError({
body: notFoundResponse,
statusCode: 404,
headers: {},
meta: anyObject,
warnings: [],
});
const mockGetPipeline = {
'my-index@ml-inference': {
id: 'my-index@ml-inference',
processors: [
{
pipeline: {
name: 'my-ml-pipeline',
},
},
],
},
};
it('should delete pipeline', async () => {
mockClient.ingest.getPipeline.mockImplementation(() => Promise.resolve(mockGetPipeline));
mockClient.ingest.putPipeline.mockImplementation(() => Promise.resolve({ acknowledged: true }));
mockClient.ingest.deletePipeline.mockImplementation(() =>
Promise.resolve({ acknowledged: true })
);
const expectedResponse = { deleted: 'my-ml-pipeline', updated: 'my-index@ml-inference' };
const response = await deleteMlInferencePipeline(
'my-index',
'my-ml-pipeline',
mockClient as unknown as ElasticsearchClient
);
expect(response).toEqual(expectedResponse);
expect(mockClient.ingest.putPipeline).toHaveBeenCalledWith({
id: 'my-index@ml-inference',
processors: [],
});
expect(mockClient.ingest.deletePipeline).toHaveBeenCalledWith({
id: 'my-ml-pipeline',
});
});
it('should succeed when parent pipeline is missing', async () => {
mockClient.ingest.getPipeline.mockImplementation(() => Promise.reject(notFoundError));
mockClient.ingest.deletePipeline.mockImplementation(() =>
Promise.resolve({ acknowledged: true })
);
const expectedResponse = {
deleted: 'my-ml-pipeline',
};
const response = await deleteMlInferencePipeline(
'my-index',
'my-ml-pipeline',
mockClient as unknown as ElasticsearchClient
);
expect(response).toEqual(expectedResponse);
expect(mockClient.ingest.putPipeline).toHaveBeenCalledTimes(0);
expect(mockClient.ingest.deletePipeline).toHaveBeenCalledWith({
id: 'my-ml-pipeline',
});
});
it('should fail when pipeline is missing', async () => {
mockClient.ingest.getPipeline.mockImplementation(() => Promise.resolve(mockGetPipeline));
mockClient.ingest.deletePipeline.mockImplementation(() => Promise.reject(notFoundError));
await expect(
deleteMlInferencePipeline(
'my-index',
'my-ml-pipeline',
mockClient as unknown as ElasticsearchClient
)
).rejects.toThrow(Error);
expect(mockClient.ingest.putPipeline).toHaveBeenCalledWith({
id: 'my-index@ml-inference',
processors: [],
});
expect(mockClient.ingest.deletePipeline).toHaveBeenCalledWith({
id: 'my-ml-pipeline',
});
});
});

View file

@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { IngestPutPipelineRequest } from '@elastic/elasticsearch/lib/api/types';
import { ElasticsearchClient } from '@kbn/core/server';
/**
* Response for deleting sub-pipeline from @ml-inference pipeline.
* If sub-pipeline was deleted successfully, 'deleted' field contains its name.
* If parent pipeline was updated successfully, 'updated' field contains its name.
*/
export interface DeleteMlInferencePipelineResponse {
deleted?: string;
updated?: string;
}
export const deleteMlInferencePipeline = async (
indexName: string,
pipelineName: string,
client: ElasticsearchClient
) => {
const response: DeleteMlInferencePipelineResponse = {};
const parentPipelineId = `${indexName}@ml-inference`;
// find parent pipeline
try {
const pipelineResponse = await client.ingest.getPipeline({
id: parentPipelineId,
});
const parentPipeline = pipelineResponse[parentPipelineId];
if (parentPipeline !== undefined) {
// remove sub-pipeline from parent pipeline
if (parentPipeline.processors !== undefined) {
const updatedProcessors = parentPipeline.processors.filter(
(p) => !(p.pipeline !== undefined && p.pipeline.name === pipelineName)
);
// only update if we changed something
if (updatedProcessors.length !== parentPipeline.processors.length) {
const updatedPipeline: IngestPutPipelineRequest = {
...parentPipeline,
id: parentPipelineId,
processors: updatedProcessors,
};
const updateResponse = await client.ingest.putPipeline(updatedPipeline);
if (updateResponse.acknowledged === true) {
response.updated = parentPipelineId;
}
}
}
}
} catch (error) {
// only suppress Not Found error
if (error.meta?.statusCode !== 404) {
throw error;
}
}
// finally, delete pipeline
const deleteResponse = await client.ingest.deletePipeline({ id: pipelineName });
if (deleteResponse.acknowledged === true) {
response.deleted = pipelineName;
}
return response;
};

View file

@ -17,8 +17,14 @@ jest.mock('../../lib/indices/fetch_ml_inference_pipeline_processors', () => ({
jest.mock('../../utils/create_ml_inference_pipeline', () => ({
createAndReferenceMlInferencePipeline: jest.fn(),
}));
jest.mock('../../lib/indices/delete_ml_inference_pipeline', () => ({
deleteMlInferencePipeline: jest.fn(),
}));
import { deleteMlInferencePipeline } from '../../lib/indices/delete_ml_inference_pipeline';
import { fetchMlInferencePipelineProcessors } from '../../lib/indices/fetch_ml_inference_pipeline_processors';
import { createAndReferenceMlInferencePipeline } from '../../utils/create_ml_inference_pipeline';
import { ElasticsearchResponseError } from '../../utils/identify_exceptions';
import { registerIndexRoutes } from './indices';
@ -168,4 +174,72 @@ describe('Enterprise Search Managed Indices', () => {
);
});
});
describe('DELETE /internal/enterprise_search/indices/{indexName}/ml_inference/pipelines/{pipelineName}', () => {
const indexName = 'my-index';
const pipelineName = 'my-pipeline';
beforeEach(() => {
const context = {
core: Promise.resolve({ elasticsearch: { client: mockClient } }),
} as jest.Mocked<RequestHandlerContext>;
mockRouter = new MockRouter({
context,
method: 'delete',
path: '/internal/enterprise_search/indices/{indexName}/ml_inference/pipelines/{pipelineName}',
});
registerIndexRoutes({
...mockDependencies,
router: mockRouter.router,
});
});
it('fails validation without index_name', () => {
const request = { params: {} };
mockRouter.shouldThrow(request);
});
it('deletes pipeline', async () => {
const mockResponse = { deleted: pipelineName };
(deleteMlInferencePipeline as jest.Mock).mockImplementationOnce(() => {
return Promise.resolve(mockResponse);
});
await mockRouter.callRoute({
params: { indexName, pipelineName },
});
expect(deleteMlInferencePipeline).toHaveBeenCalledWith(indexName, pipelineName, {});
expect(mockRouter.response.ok).toHaveBeenCalledWith({
body: mockResponse,
headers: { 'content-type': 'application/json' },
});
});
it('raises error if deletion failed', async () => {
const errorReason = `pipeline is missing: [${pipelineName}]`;
const mockError = new Error(errorReason) as ElasticsearchResponseError;
mockError.meta = {
body: {
error: {
type: 'resource_not_found_exception',
},
},
};
(deleteMlInferencePipeline as jest.Mock).mockImplementationOnce(() => {
return Promise.reject(mockError);
});
await mockRouter.callRoute({
params: { indexName, pipelineName },
});
expect(deleteMlInferencePipeline).toHaveBeenCalledWith(indexName, pipelineName, {});
expect(mockRouter.response.customError).toHaveBeenCalledTimes(1);
});
});
});

View file

@ -16,6 +16,7 @@ import { fetchConnectorByIndexName, fetchConnectors } from '../../lib/connectors
import { fetchCrawlerByIndexName, fetchCrawlers } from '../../lib/crawler/fetch_crawlers';
import { createIndex } from '../../lib/indices/create_index';
import { deleteMlInferencePipeline } from '../../lib/indices/delete_ml_inference_pipeline';
import { fetchIndex } from '../../lib/indices/fetch_index';
import { fetchIndices } from '../../lib/indices/fetch_indices';
import { fetchMlInferencePipelineProcessors } from '../../lib/indices/fetch_ml_inference_pipeline_processors';
@ -29,7 +30,10 @@ import {
CreatedPipeline,
} from '../../utils/create_ml_inference_pipeline';
import { elasticsearchErrorHandler } from '../../utils/elasticsearch_error_handler';
import { isIndexNotFoundException } from '../../utils/identify_exceptions';
import {
isIndexNotFoundException,
isResourceNotFoundException,
} from '../../utils/identify_exceptions';
export function registerIndexRoutes({
router,
@ -448,4 +452,46 @@ export function registerIndexRoutes({
});
})
);
router.delete(
{
path: '/internal/enterprise_search/indices/{indexName}/ml_inference/pipelines/{pipelineName}',
validate: {
params: schema.object({
indexName: schema.string(),
pipelineName: schema.string(),
}),
},
},
elasticsearchErrorHandler(log, async (context, request, response) => {
const indexName = decodeURIComponent(request.params.indexName);
const pipelineName = decodeURIComponent(request.params.pipelineName);
const { client } = (await context.core).elasticsearch;
try {
const deleteResult = await deleteMlInferencePipeline(
indexName,
pipelineName,
client.asCurrentUser
);
return response.ok({
body: deleteResult,
headers: { 'content-type': 'application/json' },
});
} catch (error) {
if (isResourceNotFoundException(error)) {
// return specific message if pipeline doesn't exist
return createError({
errorCode: ErrorCode.RESOURCE_NOT_FOUND,
message: error.meta?.body?.error?.reason,
response,
statusCode: 404,
});
}
// otherwise, let the default handler wrap it
throw error;
}
})
);
}

View file

@ -5,7 +5,7 @@
* 2.0.
*/
interface ElasticsearchResponseError {
export interface ElasticsearchResponseError {
meta?: {
body?: {
error?: {
@ -23,5 +23,8 @@ export const isIndexNotFoundException = (error: ElasticsearchResponseError) =>
export const isResourceAlreadyExistsException = (error: ElasticsearchResponseError) =>
error?.meta?.body?.error?.type === 'resource_already_exists_exception';
export const isResourceNotFoundException = (error: ElasticsearchResponseError) =>
error?.meta?.body?.error?.type === 'resource_not_found_exception';
export const isUnauthorizedException = (error: ElasticsearchResponseError) =>
error.meta?.statusCode === 403;