mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
[Ingest Pipelines] Migrate to new ES client (#96406)
* - migrated use of legacy.client to client - removed use of isEsError to detect legacy errors - refactored types to use types from @elastic/elasticsearch instead (where appropriate) tested get, put, post, delete, simulate and documents endpoints locally * remove use of legacyEs service in functional test * fixing type issues and API response object * remove id from get all request! * reinstated logic for handling 404 from get all pipelines request * clarify error handling with comments and small variable name refactor * updated delete error responses * update functional test * refactor use of legacyEs Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
ec62be7ca8
commit
f31e13c426
19 changed files with 85 additions and 154 deletions
|
@ -17,8 +17,10 @@ interface EsErrorHandlerParams {
|
|||
handleCustomError?: () => IKibanaResponse<any>;
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* For errors returned by the new elasticsearch js client.
|
||||
*
|
||||
* @throws If "error" is not an error from the elasticsearch client this handler will throw "error".
|
||||
*/
|
||||
export const handleEsError = ({
|
||||
error,
|
||||
|
|
|
@ -5,14 +5,17 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import { PipelinesByName, Pipeline } from '../types';
|
||||
import { Pipeline as ESPipeline } from '@elastic/elasticsearch/api/types';
|
||||
import { Pipeline, Processor } from '../types';
|
||||
|
||||
export function deserializePipelines(pipelinesByName: PipelinesByName): Pipeline[] {
|
||||
export function deserializePipelines(pipelinesByName: { [key: string]: ESPipeline }): Pipeline[] {
|
||||
const pipelineNames: string[] = Object.keys(pipelinesByName);
|
||||
|
||||
const deserializedPipelines = pipelineNames.map((name: string) => {
|
||||
const deserializedPipelines = pipelineNames.map<Pipeline>((name: string) => {
|
||||
return {
|
||||
...pipelinesByName[name],
|
||||
processors: (pipelinesByName[name]?.processors as Processor[]) ?? [],
|
||||
on_failure: pipelinesByName[name]?.on_failure as Processor[],
|
||||
name,
|
||||
};
|
||||
});
|
||||
|
|
|
@ -19,7 +19,7 @@ export interface Processor {
|
|||
|
||||
export interface Pipeline {
|
||||
name: string;
|
||||
description: string;
|
||||
description?: string;
|
||||
version?: number;
|
||||
processors: Processor[];
|
||||
on_failure?: Processor[];
|
||||
|
|
|
@ -13,7 +13,7 @@ import { PLUGIN_ID, PLUGIN_MIN_LICENSE_TYPE } from '../common/constants';
|
|||
|
||||
import { License } from './services';
|
||||
import { ApiRoutes } from './routes';
|
||||
import { isEsError } from './shared_imports';
|
||||
import { handleEsError } from './shared_imports';
|
||||
import { Dependencies } from './types';
|
||||
|
||||
export class IngestPipelinesPlugin implements Plugin<void, void, any, any> {
|
||||
|
@ -66,7 +66,7 @@ export class IngestPipelinesPlugin implements Plugin<void, void, any, any> {
|
|||
isSecurityEnabled: () => security !== undefined && security.license.isEnabled(),
|
||||
},
|
||||
lib: {
|
||||
isEsError,
|
||||
handleEsError,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
|
|
@ -11,8 +11,7 @@ import { schema } from '@kbn/config-schema';
|
|||
import { Pipeline } from '../../../common/types';
|
||||
import { API_BASE_PATH } from '../../../common/constants';
|
||||
import { RouteDependencies } from '../../types';
|
||||
import { pipelineSchema } from './pipeline_schema';
|
||||
import { isObjectWithKeys } from './shared';
|
||||
import { pipelineSchema } from './shared';
|
||||
|
||||
const bodySchema = schema.object({
|
||||
name: schema.string(),
|
||||
|
@ -22,7 +21,7 @@ const bodySchema = schema.object({
|
|||
export const registerCreateRoute = ({
|
||||
router,
|
||||
license,
|
||||
lib: { isEsError },
|
||||
lib: { handleEsError },
|
||||
}: RouteDependencies): void => {
|
||||
router.post(
|
||||
{
|
||||
|
@ -32,7 +31,7 @@ export const registerCreateRoute = ({
|
|||
},
|
||||
},
|
||||
license.guardApiRoute(async (ctx, req, res) => {
|
||||
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
|
||||
const { client: clusterClient } = ctx.core.elasticsearch;
|
||||
const pipeline = req.body as Pipeline;
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||
|
@ -40,7 +39,9 @@ export const registerCreateRoute = ({
|
|||
|
||||
try {
|
||||
// Check that a pipeline with the same name doesn't already exist
|
||||
const pipelineByName = await callAsCurrentUser('ingest.getPipeline', { id: name });
|
||||
const { body: pipelineByName } = await clusterClient.asCurrentUser.ingest.getPipeline({
|
||||
id: name,
|
||||
});
|
||||
|
||||
if (pipelineByName[name]) {
|
||||
return res.conflict({
|
||||
|
@ -59,7 +60,7 @@ export const registerCreateRoute = ({
|
|||
}
|
||||
|
||||
try {
|
||||
const response = await callAsCurrentUser('ingest.putPipeline', {
|
||||
const { body: response } = await clusterClient.asCurrentUser.ingest.putPipeline({
|
||||
id: name,
|
||||
body: {
|
||||
description,
|
||||
|
@ -71,19 +72,7 @@ export const registerCreateRoute = ({
|
|||
|
||||
return res.ok({ body: response });
|
||||
} catch (error) {
|
||||
if (isEsError(error)) {
|
||||
return res.customError({
|
||||
statusCode: error.statusCode,
|
||||
body: isObjectWithKeys(error.body)
|
||||
? {
|
||||
message: error.message,
|
||||
attributes: error.body,
|
||||
}
|
||||
: error,
|
||||
});
|
||||
}
|
||||
|
||||
throw error;
|
||||
return handleEsError({ error, response: res });
|
||||
}
|
||||
})
|
||||
);
|
||||
|
|
|
@ -23,7 +23,7 @@ export const registerDeleteRoute = ({ router, license }: RouteDependencies): voi
|
|||
},
|
||||
},
|
||||
license.guardApiRoute(async (ctx, req, res) => {
|
||||
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
|
||||
const { client: clusterClient } = ctx.core.elasticsearch;
|
||||
const { names } = req.params;
|
||||
const pipelineNames = names.split(',');
|
||||
|
||||
|
@ -34,14 +34,16 @@ export const registerDeleteRoute = ({ router, license }: RouteDependencies): voi
|
|||
|
||||
await Promise.all(
|
||||
pipelineNames.map((pipelineName) => {
|
||||
return callAsCurrentUser('ingest.deletePipeline', { id: pipelineName })
|
||||
return clusterClient.asCurrentUser.ingest
|
||||
.deletePipeline({ id: pipelineName })
|
||||
.then(() => response.itemsDeleted.push(pipelineName))
|
||||
.catch((e) =>
|
||||
.catch((e) => {
|
||||
response.errors.push({
|
||||
error: e?.meta?.body?.error ?? e,
|
||||
status: e?.meta?.body?.status,
|
||||
name: pipelineName,
|
||||
error: e,
|
||||
})
|
||||
);
|
||||
});
|
||||
});
|
||||
})
|
||||
);
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ const paramsSchema = schema.object({
|
|||
export const registerDocumentsRoute = ({
|
||||
router,
|
||||
license,
|
||||
lib: { isEsError },
|
||||
lib: { handleEsError },
|
||||
}: RouteDependencies): void => {
|
||||
router.get(
|
||||
{
|
||||
|
@ -28,11 +28,11 @@ export const registerDocumentsRoute = ({
|
|||
},
|
||||
},
|
||||
license.guardApiRoute(async (ctx, req, res) => {
|
||||
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
|
||||
const { client: clusterClient } = ctx.core.elasticsearch;
|
||||
const { index, id } = req.params;
|
||||
|
||||
try {
|
||||
const document = await callAsCurrentUser('get', { index, id });
|
||||
const { body: document } = await clusterClient.asCurrentUser.get({ index, id });
|
||||
|
||||
const { _id, _index, _source } = document;
|
||||
|
||||
|
@ -44,14 +44,7 @@ export const registerDocumentsRoute = ({
|
|||
},
|
||||
});
|
||||
} catch (error) {
|
||||
if (isEsError(error)) {
|
||||
return res.customError({
|
||||
statusCode: error.statusCode,
|
||||
body: error,
|
||||
});
|
||||
}
|
||||
|
||||
throw error;
|
||||
return handleEsError({ error, response: res });
|
||||
}
|
||||
})
|
||||
);
|
||||
|
|
|
@ -18,33 +18,26 @@ const paramsSchema = schema.object({
|
|||
export const registerGetRoutes = ({
|
||||
router,
|
||||
license,
|
||||
lib: { isEsError },
|
||||
lib: { handleEsError },
|
||||
}: RouteDependencies): void => {
|
||||
// Get all pipelines
|
||||
router.get(
|
||||
{ path: API_BASE_PATH, validate: false },
|
||||
license.guardApiRoute(async (ctx, req, res) => {
|
||||
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
|
||||
const { client: clusterClient } = ctx.core.elasticsearch;
|
||||
|
||||
try {
|
||||
const pipelines = await callAsCurrentUser('ingest.getPipeline');
|
||||
const { body: pipelines } = await clusterClient.asCurrentUser.ingest.getPipeline();
|
||||
|
||||
return res.ok({ body: deserializePipelines(pipelines) });
|
||||
} catch (error) {
|
||||
if (isEsError(error)) {
|
||||
const esErrorResponse = handleEsError({ error, response: res });
|
||||
if (esErrorResponse.status === 404) {
|
||||
// ES returns 404 when there are no pipelines
|
||||
// Instead, we return an empty array and 200 status back to the client
|
||||
if (error.status === 404) {
|
||||
return res.ok({ body: [] });
|
||||
}
|
||||
|
||||
return res.customError({
|
||||
statusCode: error.statusCode,
|
||||
body: error,
|
||||
});
|
||||
return res.ok({ body: [] });
|
||||
}
|
||||
|
||||
throw error;
|
||||
return esErrorResponse;
|
||||
}
|
||||
})
|
||||
);
|
||||
|
@ -58,27 +51,22 @@ export const registerGetRoutes = ({
|
|||
},
|
||||
},
|
||||
license.guardApiRoute(async (ctx, req, res) => {
|
||||
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
|
||||
const { client: clusterClient } = ctx.core.elasticsearch;
|
||||
const { name } = req.params;
|
||||
|
||||
try {
|
||||
const pipeline = await callAsCurrentUser('ingest.getPipeline', { id: name });
|
||||
const { body: pipelines } = await clusterClient.asCurrentUser.ingest.getPipeline({
|
||||
id: name,
|
||||
});
|
||||
|
||||
return res.ok({
|
||||
body: {
|
||||
...pipeline[name],
|
||||
...pipelines[name],
|
||||
name,
|
||||
},
|
||||
});
|
||||
} catch (error) {
|
||||
if (isEsError(error)) {
|
||||
return res.customError({
|
||||
statusCode: error.statusCode,
|
||||
body: error,
|
||||
});
|
||||
}
|
||||
|
||||
throw error;
|
||||
return handleEsError({ error, response: res });
|
||||
}
|
||||
})
|
||||
);
|
||||
|
|
|
@ -36,24 +36,13 @@ export const registerPrivilegesRoute = ({ license, router, config }: RouteDepend
|
|||
return res.ok({ body: privilegesResult });
|
||||
}
|
||||
|
||||
const {
|
||||
core: {
|
||||
elasticsearch: {
|
||||
legacy: { client },
|
||||
},
|
||||
},
|
||||
} = ctx;
|
||||
const { client: clusterClient } = ctx.core.elasticsearch;
|
||||
|
||||
const { has_all_requested: hasAllPrivileges, cluster } = await client.callAsCurrentUser(
|
||||
'transport.request',
|
||||
{
|
||||
path: '/_security/user/_has_privileges',
|
||||
method: 'POST',
|
||||
body: {
|
||||
cluster: APP_CLUSTER_REQUIRED_PRIVILEGES,
|
||||
},
|
||||
}
|
||||
);
|
||||
const {
|
||||
body: { has_all_requested: hasAllPrivileges, cluster },
|
||||
} = await clusterClient.asCurrentUser.security.hasPrivileges({
|
||||
body: { cluster: APP_CLUSTER_REQUIRED_PRIVILEGES },
|
||||
});
|
||||
|
||||
if (!hasAllPrivileges) {
|
||||
privilegesResult.missingPrivileges.cluster = extractMissingPrivileges(cluster);
|
||||
|
|
|
@ -5,4 +5,4 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
export { isObjectWithKeys } from './is_object_with_keys';
|
||||
export { pipelineSchema } from './pipeline_schema';
|
||||
|
|
|
@ -1,10 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
export const isObjectWithKeys = (value: unknown) => {
|
||||
return typeof value === 'object' && !!value && Object.keys(value).length > 0;
|
||||
};
|
|
@ -4,12 +4,12 @@
|
|||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { SimulatePipelineDocument } from '@elastic/elasticsearch/api/types';
|
||||
import { schema } from '@kbn/config-schema';
|
||||
|
||||
import { API_BASE_PATH } from '../../../common/constants';
|
||||
import { RouteDependencies } from '../../types';
|
||||
import { pipelineSchema } from './pipeline_schema';
|
||||
import { pipelineSchema } from './shared';
|
||||
|
||||
const bodySchema = schema.object({
|
||||
pipeline: schema.object(pipelineSchema),
|
||||
|
@ -20,7 +20,7 @@ const bodySchema = schema.object({
|
|||
export const registerSimulateRoute = ({
|
||||
router,
|
||||
license,
|
||||
lib: { isEsError },
|
||||
lib: { handleEsError },
|
||||
}: RouteDependencies): void => {
|
||||
router.post(
|
||||
{
|
||||
|
@ -30,29 +30,22 @@ export const registerSimulateRoute = ({
|
|||
},
|
||||
},
|
||||
license.guardApiRoute(async (ctx, req, res) => {
|
||||
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
|
||||
const { client: clusterClient } = ctx.core.elasticsearch;
|
||||
|
||||
const { pipeline, documents, verbose } = req.body;
|
||||
|
||||
try {
|
||||
const response = await callAsCurrentUser('ingest.simulate', {
|
||||
const { body: response } = await clusterClient.asCurrentUser.ingest.simulate({
|
||||
verbose,
|
||||
body: {
|
||||
pipeline,
|
||||
docs: documents,
|
||||
docs: documents as SimulatePipelineDocument[],
|
||||
},
|
||||
});
|
||||
|
||||
return res.ok({ body: response });
|
||||
} catch (error) {
|
||||
if (isEsError(error)) {
|
||||
return res.customError({
|
||||
statusCode: error.statusCode,
|
||||
body: error,
|
||||
});
|
||||
}
|
||||
|
||||
throw error;
|
||||
return handleEsError({ error, response: res });
|
||||
}
|
||||
})
|
||||
);
|
||||
|
|
|
@ -9,8 +9,7 @@ import { schema } from '@kbn/config-schema';
|
|||
|
||||
import { API_BASE_PATH } from '../../../common/constants';
|
||||
import { RouteDependencies } from '../../types';
|
||||
import { pipelineSchema } from './pipeline_schema';
|
||||
import { isObjectWithKeys } from './shared';
|
||||
import { pipelineSchema } from './shared';
|
||||
|
||||
const bodySchema = schema.object(pipelineSchema);
|
||||
|
||||
|
@ -21,7 +20,7 @@ const paramsSchema = schema.object({
|
|||
export const registerUpdateRoute = ({
|
||||
router,
|
||||
license,
|
||||
lib: { isEsError },
|
||||
lib: { handleEsError },
|
||||
}: RouteDependencies): void => {
|
||||
router.put(
|
||||
{
|
||||
|
@ -32,16 +31,16 @@ export const registerUpdateRoute = ({
|
|||
},
|
||||
},
|
||||
license.guardApiRoute(async (ctx, req, res) => {
|
||||
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
|
||||
const { client: clusterClient } = ctx.core.elasticsearch;
|
||||
const { name } = req.params;
|
||||
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||
const { description, processors, version, on_failure } = req.body;
|
||||
|
||||
try {
|
||||
// Verify pipeline exists; ES will throw 404 if it doesn't
|
||||
await callAsCurrentUser('ingest.getPipeline', { id: name });
|
||||
await clusterClient.asCurrentUser.ingest.getPipeline({ id: name });
|
||||
|
||||
const response = await callAsCurrentUser('ingest.putPipeline', {
|
||||
const { body: response } = await clusterClient.asCurrentUser.ingest.putPipeline({
|
||||
id: name,
|
||||
body: {
|
||||
description,
|
||||
|
@ -53,19 +52,7 @@ export const registerUpdateRoute = ({
|
|||
|
||||
return res.ok({ body: response });
|
||||
} catch (error) {
|
||||
if (isEsError(error)) {
|
||||
return res.customError({
|
||||
statusCode: error.statusCode,
|
||||
body: isObjectWithKeys(error.body)
|
||||
? {
|
||||
message: error.message,
|
||||
attributes: error.body,
|
||||
}
|
||||
: error,
|
||||
});
|
||||
}
|
||||
|
||||
throw error;
|
||||
return handleEsError({ error, response: res });
|
||||
}
|
||||
})
|
||||
);
|
||||
|
|
|
@ -5,4 +5,4 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
export { isEsError } from '../../../../src/plugins/es_ui_shared/server';
|
||||
export { handleEsError } from '../../../../src/plugins/es_ui_shared/server';
|
||||
|
|
|
@ -10,7 +10,7 @@ import { LicensingPluginSetup } from '../../licensing/server';
|
|||
import { SecurityPluginSetup } from '../../security/server';
|
||||
import { PluginSetupContract as FeaturesPluginSetup } from '../../features/server';
|
||||
import { License } from './services';
|
||||
import { isEsError } from './shared_imports';
|
||||
import { handleEsError } from './shared_imports';
|
||||
|
||||
export interface Dependencies {
|
||||
security: SecurityPluginSetup;
|
||||
|
@ -25,6 +25,6 @@ export interface RouteDependencies {
|
|||
isSecurityEnabled: () => boolean;
|
||||
};
|
||||
lib: {
|
||||
isEsError: typeof isEsError;
|
||||
handleEsError: typeof handleEsError;
|
||||
};
|
||||
}
|
||||
|
|
|
@ -204,7 +204,8 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
expect(body).to.eql({
|
||||
statusCode: 404,
|
||||
error: 'Not Found',
|
||||
message: 'Not Found',
|
||||
message: 'Response Error',
|
||||
attributes: {},
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -339,24 +340,16 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
{
|
||||
name: PIPELINE_DOES_NOT_EXIST,
|
||||
error: {
|
||||
msg: '[resource_not_found_exception] pipeline [pipeline_does_not_exist] is missing',
|
||||
path: '/_ingest/pipeline/pipeline_does_not_exist',
|
||||
query: {},
|
||||
statusCode: 404,
|
||||
response: JSON.stringify({
|
||||
error: {
|
||||
root_cause: [
|
||||
{
|
||||
type: 'resource_not_found_exception',
|
||||
reason: 'pipeline [pipeline_does_not_exist] is missing',
|
||||
},
|
||||
],
|
||||
root_cause: [
|
||||
{
|
||||
type: 'resource_not_found_exception',
|
||||
reason: 'pipeline [pipeline_does_not_exist] is missing',
|
||||
},
|
||||
status: 404,
|
||||
}),
|
||||
],
|
||||
type: 'resource_not_found_exception',
|
||||
reason: 'pipeline [pipeline_does_not_exist] is missing',
|
||||
},
|
||||
status: 404,
|
||||
},
|
||||
],
|
||||
});
|
||||
|
@ -501,8 +494,9 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
|
||||
expect(body).to.eql({
|
||||
error: 'Not Found',
|
||||
message: 'Not Found',
|
||||
message: 'Response Error',
|
||||
statusCode: 404,
|
||||
attributes: {},
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -30,17 +30,18 @@ interface Pipeline {
|
|||
export const registerEsHelpers = (getService: FtrProviderContext['getService']) => {
|
||||
let pipelinesCreated: string[] = [];
|
||||
|
||||
const es = getService('legacyEs');
|
||||
const es = getService('es');
|
||||
|
||||
const createPipeline = (pipeline: Pipeline, cachePipeline?: boolean) => {
|
||||
if (cachePipeline) {
|
||||
pipelinesCreated.push(pipeline.id);
|
||||
}
|
||||
|
||||
return es.ingest.putPipeline(pipeline);
|
||||
return es.ingest.putPipeline(pipeline).then(({ body }) => body);
|
||||
};
|
||||
|
||||
const deletePipeline = (pipelineId: string) => es.ingest.deletePipeline({ id: pipelineId });
|
||||
const deletePipeline = (pipelineId: string) =>
|
||||
es.ingest.deletePipeline({ id: pipelineId }).then(({ body }) => body);
|
||||
|
||||
const cleanupPipelines = () =>
|
||||
Promise.all(pipelinesCreated.map(deletePipeline))
|
||||
|
@ -53,11 +54,11 @@ export const registerEsHelpers = (getService: FtrProviderContext['getService'])
|
|||
});
|
||||
|
||||
const createIndex = (index: { index: string; id: string; body: object }) => {
|
||||
return es.index(index);
|
||||
return es.index(index).then(({ body }) => body);
|
||||
};
|
||||
|
||||
const deleteIndex = (indexName: string) => {
|
||||
return es.indices.delete({ index: indexName });
|
||||
return es.indices.delete({ index: indexName }).then(({ body }) => body);
|
||||
};
|
||||
|
||||
return {
|
||||
|
|
|
@ -17,7 +17,7 @@ const PIPELINE = {
|
|||
export default ({ getPageObjects, getService }: FtrProviderContext) => {
|
||||
const pageObjects = getPageObjects(['common', 'ingestPipelines']);
|
||||
const log = getService('log');
|
||||
const es = getService('legacyEs');
|
||||
const es = getService('es');
|
||||
|
||||
describe('Ingest Pipelines', function () {
|
||||
this.tags('smoke');
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue