[8.6][ML Inference] Verify pipeline usage before deletion (#144053)

* Add validation of pipeline usage before deletion
This commit is contained in:
Adam Demjen 2022-10-28 16:13:18 -04:00 committed by GitHub
parent fe2480d96d
commit 7671176714
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 138 additions and 11 deletions

View file

@ -14,6 +14,7 @@ export enum ErrorCode {
INDEX_ALREADY_EXISTS = 'index_already_exists',
INDEX_NOT_FOUND = 'index_not_found',
PIPELINE_ALREADY_EXISTS = 'pipeline_already_exists',
PIPELINE_IS_IN_USE = 'pipeline_is_in_use',
RESOURCE_NOT_FOUND = 'resource_not_found',
UNAUTHORIZED = 'unauthorized',
UNCAUGHT_EXCEPTION = 'uncaught_exception',

View file

@ -8,6 +8,8 @@
import { errors } from '@elastic/elasticsearch';
import { ElasticsearchClient } from '@kbn/core/server';
import { ErrorCode } from '../../../../../../common/types/error_codes';
import { deleteMlInferencePipeline } from './delete_ml_inference_pipeline';
describe('deleteMlInferencePipeline lib function', () => {
@ -72,7 +74,9 @@ describe('deleteMlInferencePipeline lib function', () => {
});
it('should succeed when parent pipeline is missing', async () => {
mockClient.ingest.getPipeline.mockImplementation(() => Promise.reject(notFoundError));
mockClient.ingest.getPipeline
.mockImplementationOnce(() => Promise.resolve({})) // 1st call (get *@ml-inference)
.mockImplementation(() => Promise.reject(notFoundError)); // Subsequent calls
mockClient.ingest.deletePipeline.mockImplementation(() =>
Promise.resolve({ acknowledged: true })
);
@ -115,4 +119,36 @@ describe('deleteMlInferencePipeline lib function', () => {
id: 'my-ml-pipeline',
});
});
it("should fail when pipeline is used in another index's pipeline", async () => {
const mockGetPipelines = {
...mockGetPipeline, // References my-ml-pipeline
'my-other-index@ml-inference': {
id: 'my-other-index@ml-inference',
processors: [
{
pipeline: {
name: 'my-ml-pipeline', // Also references my-ml-pipeline
},
},
],
},
};
mockClient.ingest.getPipeline
.mockImplementationOnce(() => Promise.resolve(mockGetPipelines)) // 1st call
.mockImplementation(() => Promise.resolve(mockGetPipeline)); // Subsequent calls
mockClient.ingest.deletePipeline.mockImplementation(() => Promise.reject(notFoundError));
await expect(
deleteMlInferencePipeline(
'my-index',
'my-ml-pipeline',
mockClient as unknown as ElasticsearchClient
)
).rejects.toThrow(ErrorCode.PIPELINE_IS_IN_USE);
expect(mockClient.ingest.putPipeline).toHaveBeenCalledTimes(0);
expect(mockClient.ingest.deletePipeline).toHaveBeenCalledTimes(0);
});
});

View file

@ -7,8 +7,11 @@
import { ElasticsearchClient } from '@kbn/core/server';
import { ErrorCode } from '../../../../../../common/types/error_codes';
import { DeleteMlInferencePipelineResponse } from '../../../../../../common/types/pipelines';
import { getInferencePipelineNameFromIndexName } from '../../../../../utils/ml_inference_pipeline_utils';
import { detachMlInferencePipeline } from './detach_ml_inference_pipeline';
export const deleteMlInferencePipeline = async (
@ -16,18 +19,18 @@ export const deleteMlInferencePipeline = async (
pipelineName: string,
client: ElasticsearchClient
) => {
let response: DeleteMlInferencePipelineResponse = {};
try {
response = await detachMlInferencePipeline(indexName, pipelineName, client);
} catch (error) {
// only suppress Not Found error
if (error.meta?.statusCode !== 404) {
throw error;
}
// Check if the pipeline is in use in a different index's managed pipeline
const otherPipelineName = await findUsageInOtherManagedPipelines(pipelineName, indexName, client);
if (otherPipelineName) {
throw Object.assign(new Error(ErrorCode.PIPELINE_IS_IN_USE), {
pipelineName: otherPipelineName,
});
}
// finally, delete pipeline
// Detach the pipeline first
const response = await detachPipeline(indexName, pipelineName, client);
// Finally, delete pipeline
const deleteResponse = await client.ingest.deletePipeline({ id: pipelineName });
if (deleteResponse.acknowledged === true) {
response.deleted = pipelineName;
@ -35,3 +38,47 @@ export const deleteMlInferencePipeline = async (
return response;
};
const detachPipeline = async (
indexName: string,
pipelineName: string,
client: ElasticsearchClient
): Promise<DeleteMlInferencePipelineResponse> => {
try {
return await detachMlInferencePipeline(indexName, pipelineName, client);
} catch (error) {
// only suppress Not Found error
if (error.meta?.statusCode !== 404) {
throw error;
}
return {};
}
};
const findUsageInOtherManagedPipelines = async (
pipelineName: string,
indexName: string,
client: ElasticsearchClient
): Promise<string | undefined> => {
try {
// Fetch all managed parent ML pipelines
const pipelines = await client.ingest.getPipeline({
id: '*@ml-inference',
});
// The given inference pipeline is being used in another index's managed pipeline if:
// - The index name is different from the one we're deleting from, AND
// - Its processors contain at least one entry in which the supplied pipeline name is referenced
return Object.entries(pipelines).find(
([name, pipeline]) =>
name !== getInferencePipelineNameFromIndexName(indexName) &&
pipeline.processors?.find((processor) => processor.pipeline?.name === pipelineName)
)?.[0]; // Managed pipeline name
} catch (error) {
// only suppress Not Found error
if (error.meta?.statusCode !== 404) {
throw error;
}
}
};

View file

@ -449,6 +449,26 @@ describe('Enterprise Search Managed Indices', () => {
);
expect(mockRouter.response.customError).toHaveBeenCalledTimes(1);
});
it('raises error if the pipeline is in use', async () => {
(deleteMlInferencePipeline as jest.Mock).mockImplementationOnce(() => {
return Promise.reject({
message: ErrorCode.PIPELINE_IS_IN_USE,
pipelineName: 'my-other-index@ml-inference',
});
});
await mockRouter.callRoute({
params: { indexName, pipelineName },
});
expect(deleteMlInferencePipeline).toHaveBeenCalledWith(
indexName,
pipelineName,
mockClient.asCurrentUser
);
expect(mockRouter.response.customError).toHaveBeenCalledTimes(1);
});
});
describe('POST /internal/enterprise_search/indices/{indexName}/ml_inference/pipeline_processors/simulate', () => {

View file

@ -48,6 +48,7 @@ import { createError } from '../../utils/create_error';
import { elasticsearchErrorHandler } from '../../utils/elasticsearch_error_handler';
import {
isIndexNotFoundException,
isPipelineIsInUseException,
isResourceNotFoundException,
} from '../../utils/identify_exceptions';
import { getPrefixedInferencePipelineProcessorName } from '../../utils/ml_inference_pipeline_utils';
@ -697,7 +698,24 @@ export function registerIndexRoutes({
response,
statusCode: 404,
});
} else if (isPipelineIsInUseException(error)) {
return createError({
errorCode: ErrorCode.PIPELINE_IS_IN_USE,
message: i18n.translate(
'xpack.enterpriseSearch.server.routes.indices.mlInference.pipelineProcessors.pipelineIsInUseError',
{
defaultMessage:
"Inference pipeline is used in managed pipeline '{pipelineName}' of a different index",
values: {
pipelineName: error.pipelineName,
},
}
),
response,
statusCode: 400,
});
}
// otherwise, let the default handler wrap it
throw error;
}

View file

@ -5,6 +5,8 @@
* 2.0.
*/
import { ErrorCode } from '../../common/types/error_codes';
export interface ElasticsearchResponseError {
meta?: {
body?: {
@ -28,3 +30,6 @@ export const isResourceNotFoundException = (error: ElasticsearchResponseError) =
export const isUnauthorizedException = (error: ElasticsearchResponseError) =>
error.meta?.statusCode === 403;
export const isPipelineIsInUseException = (error: Error) =>
error.message === ErrorCode.PIPELINE_IS_IN_USE;