[ML] Versioning AIOps APIs (#158806)

Adds versioning to the AIOps API.
Versions are added to the server side routes and to the client side
functions which call the routes.
Updates API tests to add the API version to the request headers.

The single API endpoint is already internal and now has been given the
version '1'.

**Internal APIs**

`/internal/aiops/explain_log_rate_spikes`
This commit is contained in:
Walter Rafelsberger 2023-06-01 16:36:59 +02:00 committed by GitHub
parent 648c669034
commit 423cb35fd7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 844 additions and 691 deletions

View file

@ -51,6 +51,7 @@ export const PageReducerStream: FC = () => {
typeof basePath
>(
`${basePath}/internal/response_stream/reducer_stream`,
'1',
{ compressResponse, simulateErrors },
{ reducer: reducerStreamReducer, initialState }
);

View file

@ -34,7 +34,7 @@ export const PageSimpleStringStream: FC = () => {
const { dispatch, errors, start, cancel, data, isRunning } = useFetchStream<
ApiSimpleStringStream,
typeof basePath
>(`${basePath}/internal/response_stream/simple_string_stream`, {
>(`${basePath}/internal/response_stream/simple_string_stream`, '1', {
compressResponse,
timeout: 500,
});

View file

@ -20,103 +20,110 @@ import {
import { API_ENDPOINT } from '../../common/api';
export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => {
router.post(
{
router.versioned
.post({
path: API_ENDPOINT.REDUCER_STREAM,
validate: {
body: reducerStreamRequestBodySchema,
access: 'internal',
})
.addVersion(
{
version: '1',
validate: {
request: {
body: reducerStreamRequestBodySchema,
},
},
},
},
async (context, request, response) => {
const maxTimeoutMs = request.body.timeout ?? 250;
const simulateError = request.body.simulateErrors ?? false;
async (context, request, response) => {
const maxTimeoutMs = request.body.timeout ?? 250;
const simulateError = request.body.simulateErrors ?? false;
let logMessageCounter = 1;
let logMessageCounter = 1;
function logDebugMessage(msg: string) {
logger.debug(`Response Stream Example #${logMessageCounter}: ${msg}`);
logMessageCounter++;
}
function logDebugMessage(msg: string) {
logger.debug(`Response Stream Example #${logMessageCounter}: ${msg}`);
logMessageCounter++;
}
logDebugMessage('Starting stream.');
logDebugMessage('Starting stream.');
let shouldStop = false;
request.events.aborted$.subscribe(() => {
logDebugMessage('aborted$ subscription trigger.');
shouldStop = true;
});
request.events.completed$.subscribe(() => {
logDebugMessage('completed$ subscription trigger.');
shouldStop = true;
});
let shouldStop = false;
request.events.aborted$.subscribe(() => {
logDebugMessage('aborted$ subscription trigger.');
shouldStop = true;
});
request.events.completed$.subscribe(() => {
logDebugMessage('completed$ subscription trigger.');
shouldStop = true;
});
const { end, push, responseWithHeaders } = streamFactory<ReducerStreamApiAction>(
request.headers,
logger,
request.body.compressResponse
);
const { end, push, responseWithHeaders } = streamFactory<ReducerStreamApiAction>(
request.headers,
logger,
request.body.compressResponse
);
const entities = [
'kimchy',
's1monw',
'martijnvg',
'jasontedor',
'nik9000',
'javanna',
'rjernst',
'jrodewig',
];
const entities = [
'kimchy',
's1monw',
'martijnvg',
'jasontedor',
'nik9000',
'javanna',
'rjernst',
'jrodewig',
];
const actions = [...Array(19).fill('add'), 'delete'];
const actions = [...Array(19).fill('add'), 'delete'];
if (simulateError) {
actions.push('throw-error');
actions.push('emit-error');
}
if (simulateError) {
actions.push('throw-error');
actions.push('emit-error');
}
let progress = 0;
let progress = 0;
async function pushStreamUpdate() {
setTimeout(() => {
try {
progress++;
async function pushStreamUpdate() {
setTimeout(() => {
try {
progress++;
if (progress > 100 || shouldStop) {
end();
return;
if (progress > 100 || shouldStop) {
end();
return;
}
push(updateProgressAction(progress));
const randomEntity = entities[Math.floor(Math.random() * entities.length)];
const randomAction = actions[Math.floor(Math.random() * actions.length)];
if (randomAction === 'add') {
const randomCommits = Math.floor(Math.random() * 100);
push(addToEntityAction(randomEntity, randomCommits));
} else if (randomAction === 'delete') {
push(deleteEntityAction(randomEntity));
} else if (randomAction === 'throw-error') {
// Throw an error. It should not crash Kibana!
// It should be caught and logged to the Kibana server console.
throw new Error('There was a (simulated) server side error!');
} else if (randomAction === 'emit-error') {
// Emit an error as a stream action.
push(errorAction('(Simulated) error pushed to the stream'));
return;
}
pushStreamUpdate();
} catch (e) {
logger.error(e);
}
}, Math.floor(Math.random() * maxTimeoutMs));
}
push(updateProgressAction(progress));
// do not call this using `await` so it will run asynchronously while we return the stream already.
pushStreamUpdate();
const randomEntity = entities[Math.floor(Math.random() * entities.length)];
const randomAction = actions[Math.floor(Math.random() * actions.length)];
if (randomAction === 'add') {
const randomCommits = Math.floor(Math.random() * 100);
push(addToEntityAction(randomEntity, randomCommits));
} else if (randomAction === 'delete') {
push(deleteEntityAction(randomEntity));
} else if (randomAction === 'throw-error') {
// Throw an error. It should not crash Kibana!
// It should be caught and logged to the Kibana server console.
throw new Error('There was a (simulated) server side error!');
} else if (randomAction === 'emit-error') {
// Emit an error as a stream action.
push(errorAction('(Simulated) error pushed to the stream'));
return;
}
pushStreamUpdate();
} catch (e) {
logger.error(e);
}
}, Math.floor(Math.random() * maxTimeoutMs));
return response.ok(responseWithHeaders);
}
// do not call this using `await` so it will run asynchronously while we return the stream already.
pushStreamUpdate();
return response.ok(responseWithHeaders);
}
);
);
};

View file

@ -17,63 +17,70 @@ function timeout(ms: number) {
}
export const defineSimpleStringStreamRoute = (router: IRouter, logger: Logger) => {
router.post(
{
router.versioned
.post({
path: API_ENDPOINT.SIMPLE_STRING_STREAM,
validate: {
body: simpleStringStreamRequestBodySchema,
access: 'internal',
})
.addVersion(
{
version: '1',
validate: {
request: {
body: simpleStringStreamRequestBodySchema,
},
},
},
},
async (context, request, response) => {
const maxTimeoutMs = request.body.timeout ?? 250;
async (context, request, response) => {
const maxTimeoutMs = request.body.timeout ?? 250;
let shouldStop = false;
request.events.aborted$.subscribe(() => {
shouldStop = true;
});
request.events.completed$.subscribe(() => {
shouldStop = true;
});
let shouldStop = false;
request.events.aborted$.subscribe(() => {
shouldStop = true;
});
request.events.completed$.subscribe(() => {
shouldStop = true;
});
const { end, push, responseWithHeaders } = streamFactory(
request.headers,
logger,
request.body.compressResponse
);
const { end, push, responseWithHeaders } = streamFactory(
request.headers,
logger,
request.body.compressResponse
);
const text =
'Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents. Elasticsearch is developed in Java and is dual-licensed under the source-available Server Side Public License and the Elastic license, while other parts fall under the proprietary (source-available) Elastic License. Official clients are available in Java, .NET (C#), PHP, Python, Apache Groovy, Ruby and many other languages. According to the DB-Engines ranking, Elasticsearch is the most popular enterprise search engine.';
const text =
'Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents. Elasticsearch is developed in Java and is dual-licensed under the source-available Server Side Public License and the Elastic license, while other parts fall under the proprietary (source-available) Elastic License. Official clients are available in Java, .NET (C#), PHP, Python, Apache Groovy, Ruby and many other languages. According to the DB-Engines ranking, Elasticsearch is the most popular enterprise search engine.';
const tokens = text.split(' ');
const tokens = text.split(' ');
async function pushStreamUpdate() {
try {
if (shouldStop) {
end();
return;
}
const token = tokens.shift();
if (token !== undefined) {
push(`${token} `);
await timeout(Math.floor(Math.random() * maxTimeoutMs));
if (!shouldStop) {
pushStreamUpdate();
async function pushStreamUpdate() {
try {
if (shouldStop) {
end();
return;
}
} else {
end();
const token = tokens.shift();
if (token !== undefined) {
push(`${token} `);
await timeout(Math.floor(Math.random() * maxTimeoutMs));
if (!shouldStop) {
pushStreamUpdate();
}
} else {
end();
}
} catch (e) {
logger.error(`There was an error: ${e.toString()}`);
}
} catch (e) {
logger.error(`There was an error: ${e.toString()}`);
}
// do not call this using `await` so it will run asynchronously while we return the stream already.
pushStreamUpdate();
return response.ok(responseWithHeaders);
}
// do not call this using `await` so it will run asynchronously while we return the stream already.
pushStreamUpdate();
return response.ok(responseWithHeaders);
}
);
);
};

View file

@ -7,6 +7,8 @@
import type { ReducerAction } from 'react';
import { ELASTIC_HTTP_VERSION_HEADER } from '@kbn/core-http-common';
import type { UseFetchStreamParamsDefault } from './use_fetch_stream';
type GeneratorError = string | null;
@ -23,6 +25,7 @@ type GeneratorError = string | null;
* ```
*
* @param endpoint The API endpoint including the Kibana basepath.
* @param apiVersion - The API version to be used.
* @param abortCtrl Abort controller for cancelling the request.
* @param body The request body. For now all requests are POST.
* @param ndjson Boolean flag to receive the stream as a raw string or NDJSON.
@ -37,6 +40,7 @@ type GeneratorError = string | null;
*/
export async function* fetchStream<I extends UseFetchStreamParamsDefault, BasePath extends string>(
endpoint: `${BasePath}${I['endpoint']}`,
apiVersion: string,
abortCtrl: React.MutableRefObject<AbortController>,
body: I['body'],
ndjson = true,
@ -54,6 +58,7 @@ export async function* fetchStream<I extends UseFetchStreamParamsDefault, BasePa
// This refers to the format of the request body,
// not the response, which will be a uint8array Buffer.
'Content-Type': 'application/json',
[ELASTIC_HTTP_VERSION_HEADER]: apiVersion,
'kbn-xsrf': 'stream',
},
...(Object.keys(body).length > 0 ? { body: JSON.stringify(body) } : {}),

View file

@ -7,11 +7,31 @@
/**
* Time range definition for baseline and deviation to be used by spike log analysis.
*
* @export
* @interface WindowParameters
* @typedef {WindowParameters}
*/
export interface WindowParameters {
/**
* Baseline minimum value
* @type {number}
*/
baselineMin: number;
/**
* Baseline maximum value
* @type {number}
*/
baselineMax: number;
/**
* Deviation minimum value
* @type {number}
*/
deviationMin: number;
/**
* Deviation maximum value
* @type {number}
*/
deviationMax: number;
}

View file

@ -21,10 +21,31 @@ import { stringReducer, StringReducer } from './string_reducer';
/**
* Custom hook type definition of the base params for an NDJSON stream with custom reducer.
*
* @export
* @interface UseFetchStreamCustomReducerParams
* @typedef {UseFetchStreamCustomReducerParams}
*/
export interface UseFetchStreamCustomReducerParams {
/**
* API endpoint
* @type {string}
*/
endpoint: string;
/**
* API version
* @type {string}
*/
apiVersion: string;
/**
* Request body
* @type {object}
*/
body: object;
/**
* Reducer function to be applied to response chunks.
* @type {Reducer<any, any>}
*/
reducer: Reducer<any, any>;
}
@ -32,11 +53,36 @@ export interface UseFetchStreamCustomReducerParams {
* Custom hook type definition of the base params for a string base stream without a custom reducer.
*/
export interface UseFetchStreamParamsDefault {
/**
* API endpoint
* @type {string}
*/
endpoint: string;
/**
* API version
* @type {string}
*/
apiVersion: string;
/**
* Request body
* @type {object}
*/
body: object;
/**
* Reducer function to be applied to response chunks.
* @type {StringReducer}
*/
reducer: StringReducer;
}
/**
* The return type of the `useFetchStream` hook.
*
* @interface UseFetchStreamReturnType
* @typedef {UseFetchStreamReturnType}
* @template Data
* @template Action
*/
interface UseFetchStreamReturnType<Data, Action> {
cancel: () => void;
data: Data;
@ -47,34 +93,83 @@ interface UseFetchStreamReturnType<Data, Action> {
start: () => Promise<void>;
}
// These overloads allow us to fall back to a simple reducer that just acts on a string as the reducer state
// if no options are supplied. Passing in options will use a custom reducer with appropriate type support.
/**
* This overload allows us to fall back to a simple reducer that
* just acts on a string as the reducer state if no options are supplied.
*
* @export
* @template I
* @template BasePath
* @param {`${I['endpoint']}`} endpoint - API endpoint including Kibana base path.
* @param {I['apiVersion']} apiVersion - API version.
* @param {I['body']} body - API request body.
* @returns {UseFetchStreamReturnType<string, ReducerAction<I['reducer']>>} - An object with streaming data and methods to act on the stream.
*/
export function useFetchStream<I extends UseFetchStreamParamsDefault, BasePath extends string>(
endpoint: `${BasePath}${I['endpoint']}`,
apiVersion: I['apiVersion'],
body: I['body']
): UseFetchStreamReturnType<string, ReducerAction<I['reducer']>>;
/**
* This overload covers passing in options and will use
* a custom reducer with appropriate type support.
*
* @export
* @template I
* @template BasePath
* @param {`${I['endpoint']}`} endpoint - API endpoint including Kibana base path.
* @param {I['apiVersion']} apiVersion - API version.
* @param {I['body']} body - API request body.
* @param {{ reducer: I['reducer']; initialState: ReducerState<I['reducer']> }} options - Custom reducer and initial state.
* @returns {UseFetchStreamReturnType<ReducerState<I['reducer']>, ReducerAction<I['reducer']>>} - An object with streaming data and methods to act on the stream.
*/
export function useFetchStream<
I extends UseFetchStreamCustomReducerParams,
BasePath extends string
>(
endpoint: `${BasePath}${I['endpoint']}`,
apiVersion: I['apiVersion'],
body: I['body'],
options: { reducer: I['reducer']; initialState: ReducerState<I['reducer']> }
options: {
/**
* Custom reducer
* @type {I['reducer']}
*/
reducer: I['reducer'];
/**
* Initial state
* @type {ReducerState<I['reducer']>}
*/
initialState: ReducerState<I['reducer']>;
}
): UseFetchStreamReturnType<ReducerState<I['reducer']>, ReducerAction<I['reducer']>>;
/**
* Custom hook to receive streaming data.
*
* @param endpoint - API endpoint including Kibana base path.
* @param apiVersion - API version.
* @param body - API request body.
* @param options - Optional custom reducer and initial state.
* @returns An object with streaming data and methods act on the stream.
* @returns An object with streaming data and methods to act on the stream.
*/
export function useFetchStream<I extends UseFetchStreamParamsDefault, BasePath extends string>(
endpoint: `${BasePath}${I['endpoint']}`,
apiVersion: string,
body: I['body'],
options?: { reducer: I['reducer']; initialState: ReducerState<I['reducer']> }
options?: {
/**
* Custom reducer
* @type {I['reducer']}
*/
reducer: I['reducer'];
/**
* Initial state
* @type {ReducerState<I['reducer']>}
*/
initialState: ReducerState<I['reducer']>;
}
): UseFetchStreamReturnType<ReducerState<I['reducer']>, ReducerAction<I['reducer']>> {
const [errors, setErrors] = useState<string[]>([]);
const [isCancelled, setIsCancelled] = useState(false);
@ -106,7 +201,7 @@ export function useFetchStream<I extends UseFetchStreamParamsDefault, BasePath e
for await (const [fetchStreamError, actions] of fetchStream<
UseFetchStreamCustomReducerParams,
BasePath
>(endpoint, abortCtrl, body, options !== undefined)) {
>(endpoint, apiVersion, abortCtrl, body, options !== undefined)) {
if (fetchStreamError !== null) {
addError(fetchStreamError);
} else if (actions.length > 0) {

View file

@ -16,6 +16,7 @@
"kbn_references": [
"@kbn/logging",
"@kbn/core-http-server",
"@kbn/core-http-common",
],
"exclude": [
"target/**/*",

View file

@ -17,6 +17,7 @@ export const API_ENDPOINT = {
export interface ApiExplainLogRateSpikes {
endpoint: typeof API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES;
apiVersion: string;
reducer: typeof streamReducer;
body: AiopsExplainLogRateSpikesSchema;
actions: AiopsExplainLogRateSpikesApiAction;

View file

@ -139,6 +139,7 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
errors: streamErrors,
} = useFetchStream<ApiExplainLogRateSpikes, typeof basePath>(
`${basePath}/internal/aiops/explain_log_rate_spikes`,
'1',
{
start: earliest,
end: latest,

File diff suppressed because it is too large Load diff

View file

@ -10,8 +10,8 @@ import fetch from 'node-fetch';
import { format as formatUrl } from 'url';
import expect from '@kbn/expect';
import type { ApiExplainLogRateSpikes } from '@kbn/aiops-plugin/common/api';
import { ELASTIC_HTTP_VERSION_HEADER } from '@kbn/core-http-common';
import type { FtrProviderContext } from '../../ftr_provider_context';
@ -104,6 +104,7 @@ export default ({ getService }: FtrProviderContext) => {
const resp = await supertest
.post(`/internal/aiops/explain_log_rate_spikes`)
.set('kbn-xsrf', 'kibana')
.set(ELASTIC_HTTP_VERSION_HEADER, '1')
.send(body)
.expect(200);
@ -161,6 +162,7 @@ export default ({ getService }: FtrProviderContext) => {
method: 'POST',
headers: {
'Content-Type': 'application/json',
[ELASTIC_HTTP_VERSION_HEADER]: '1',
'kbn-xsrf': 'stream',
},
body: JSON.stringify(body),

View file

@ -10,8 +10,8 @@ import fetch from 'node-fetch';
import { format as formatUrl } from 'url';
import expect from '@kbn/expect';
import type { ApiExplainLogRateSpikes } from '@kbn/aiops-plugin/common/api';
import { ELASTIC_HTTP_VERSION_HEADER } from '@kbn/core-http-common';
import type { FtrProviderContext } from '../../ftr_provider_context';
@ -96,6 +96,7 @@ export default ({ getService }: FtrProviderContext) => {
const resp = await supertest
.post(`/internal/aiops/explain_log_rate_spikes`)
.set('kbn-xsrf', 'kibana')
.set(ELASTIC_HTTP_VERSION_HEADER, '1')
.send(body)
.expect(200);
@ -158,6 +159,7 @@ export default ({ getService }: FtrProviderContext) => {
method: 'POST',
headers: {
'Content-Type': 'application/json',
[ELASTIC_HTTP_VERSION_HEADER]: '1',
'kbn-xsrf': 'stream',
},
body: JSON.stringify(body),

View file

@ -6,6 +6,7 @@
*/
import expect from '@kbn/expect';
import { ELASTIC_HTTP_VERSION_HEADER } from '@kbn/core-http-common';
import type { FtrProviderContext } from '../../ftr_provider_context';
@ -21,6 +22,7 @@ export default ({ getService }: FtrProviderContext) => {
const resp = await supertest
.post(`/internal/aiops/explain_log_rate_spikes`)
.set('kbn-xsrf', 'kibana')
.set(ELASTIC_HTTP_VERSION_HEADER, '1')
.send({
...testData.requestBody,
index: 'does_not_exist',