[ML] AIOps: Use Kibana's http service instead of fetch, fix throttling. (#162335)

- Originally Kibana's `http` service did not support receiving streams,
that's why we used plain `fetch` for this. This has been fixed in
#158678, so this PR updates the streaming helpers to use Kibana's `http`
service from now on.
- The PR also breaks out the response stream code into its own package
and restructures it to separate client and server side code. This brings
down the `aiops` bundle size by `~300KB`! 🥳
- The approach to client side throttling/buffering was also revamped:
There was an issue doing the throttling inside the generator function,
it always waited for the timeout. The buffering is now removed from
`fetchStream`, instead `useThrottle` from `react-use` is used on the
reduced `data` in `useFetchStream`. Loading log rate analysis results
got a lot snappier with this update!
This commit is contained in:
Walter Rafelsberger 2023-07-27 08:57:10 +02:00 committed by GitHub
parent 682c772e09
commit 0ab24e566c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 385 additions and 453 deletions

1
.github/CODEOWNERS vendored
View file

@ -501,6 +501,7 @@ x-pack/packages/ml/number_utils @elastic/ml-ui
x-pack/plugins/ml @elastic/ml-ui
x-pack/packages/ml/query_utils @elastic/ml-ui
x-pack/packages/ml/random_sampler_utils @elastic/ml-ui
x-pack/packages/ml/response_stream @elastic/ml-ui
x-pack/packages/ml/route_utils @elastic/ml-ui
x-pack/packages/ml/runtime_field_utils @elastic/ml-ui
x-pack/packages/ml/string_hash @elastic/ml-ui

View file

@ -6,7 +6,7 @@ To run Kibana with the described examples, use `yarn start --run-examples`.
The `response_stream` plugin demonstrates API endpoints that can stream data chunks with a single request with gzip/compression support. gzip-streams get decompressed natively by browsers. The plugin demonstrates two use cases to get started: Streaming a raw string as well as a more complex example that streams Redux-like actions to the client which update React state via `useReducer()`.
Code in `@kbn/aiops-utils` contains helpers to set up a stream on the server side (`streamFactory()`) and consume it on the client side via a custom hook (`useFetchStream()`). The utilities make use of TS generics in a way that allows to have type safety for both request related options as well as the returned data.
Code in `@kbn/ml-response-stream` contains helpers to set up a stream on the server side (`streamFactory()`) and consume it on the client side via a custom hook (`useFetchStream()`). The utilities make use of TS generics in a way that allows to have type safety for both request related options as well as the returned data.
No additional third party libraries are used in the helpers to make it work. On the server, they integrate with `Hapi` and use node's own `gzip`. On the client, the custom hook abstracts away the necessary logic to consume the stream, internally it makes use of a generator function and `useReducer()` to update React state.
@ -21,8 +21,12 @@ The request's headers get passed on to automatically identify if compression is
On the client, the custom hook is used like this:
```ts
const { errors, start, cancel, data, isRunning } = useFetchStream<
ApiSimpleStringStream, typeof basePath
>(`${basePath}/internal/response_stream/simple_string_stream`);
const {
errors,
start,
cancel,
data,
isRunning
} = useFetchStream('/internal/response_stream/simple_string_stream');
```

View file

@ -6,31 +6,7 @@
* Side Public License, v 1.
*/
import type {
UseFetchStreamCustomReducerParams,
UseFetchStreamParamsDefault,
} from '@kbn/aiops-utils';
import {
reducerStreamReducer,
ReducerStreamRequestBodySchema,
ReducerStreamApiAction,
} from './reducer_stream';
import { SimpleStringStreamRequestBodySchema } from './simple_string_stream';
export const API_ENDPOINT = {
export const RESPONSE_STREAM_API_ENDPOINT = {
REDUCER_STREAM: '/internal/response_stream/reducer_stream',
SIMPLE_STRING_STREAM: '/internal/response_stream/simple_string_stream',
} as const;
export interface ApiReducerStream extends UseFetchStreamCustomReducerParams {
endpoint: typeof API_ENDPOINT.REDUCER_STREAM;
reducer: typeof reducerStreamReducer;
body: ReducerStreamRequestBodySchema;
actions: ReducerStreamApiAction;
}
export interface ApiSimpleStringStream extends UseFetchStreamParamsDefault {
endpoint: typeof API_ENDPOINT.SIMPLE_STRING_STREAM;
body: SimpleStringStreamRequestBodySchema;
}

View file

@ -21,14 +21,14 @@ import {
EuiText,
} from '@elastic/eui';
import { useFetchStream } from '@kbn/aiops-utils';
import { useFetchStream } from '@kbn/ml-response-stream/client';
import { ApiReducerStream } from '../../../../../common/api';
import {
initialState,
resetStream,
reducerStreamReducer,
} from '../../../../../common/api/reducer_stream/reducer';
import { RESPONSE_STREAM_API_ENDPOINT } from '../../../../../common/api';
import { Page } from '../../../../components/page';
@ -41,16 +41,12 @@ export const PageReducerStream: FC = () => {
core: { http, notifications },
} = useDeps();
const basePath = http?.basePath.get() ?? '';
const [simulateErrors, setSimulateErrors] = useState(false);
const [compressResponse, setCompressResponse] = useState(true);
const { dispatch, start, cancel, data, errors, isCancelled, isRunning } = useFetchStream<
ApiReducerStream,
typeof basePath
>(
`${basePath}/internal/response_stream/reducer_stream`,
const { dispatch, start, cancel, data, errors, isCancelled, isRunning } = useFetchStream(
http,
RESPONSE_STREAM_API_ENDPOINT.REDUCER_STREAM,
'1',
{ compressResponse, simulateErrors },
{ reducer: reducerStreamReducer, initialState }

View file

@ -18,26 +18,27 @@ import {
EuiText,
} from '@elastic/eui';
import { useFetchStream } from '@kbn/aiops-utils';
import { useFetchStream } from '@kbn/ml-response-stream/client';
import { ApiSimpleStringStream } from '../../../../../common/api';
import { RESPONSE_STREAM_API_ENDPOINT } from '../../../../../common/api';
import { useDeps } from '../../../../hooks/use_deps';
import { Page } from '../../../../components/page';
export const PageSimpleStringStream: FC = () => {
const { core } = useDeps();
const basePath = core.http?.basePath.get() ?? '';
const [compressResponse, setCompressResponse] = useState(true);
const { dispatch, errors, start, cancel, data, isRunning } = useFetchStream<
ApiSimpleStringStream,
typeof basePath
>(`${basePath}/internal/response_stream/simple_string_stream`, '1', {
compressResponse,
timeout: 500,
});
const { dispatch, errors, start, cancel, data, isRunning } = useFetchStream(
core.http,
RESPONSE_STREAM_API_ENDPOINT.SIMPLE_STRING_STREAM,
'1',
{
compressResponse,
timeout: 500,
}
);
const onClickHandler = async () => {
if (isRunning) {

View file

@ -7,7 +7,7 @@
*/
import type { IRouter, Logger } from '@kbn/core/server';
import { streamFactory } from '@kbn/aiops-utils';
import { streamFactory } from '@kbn/ml-response-stream/server';
import {
errorAction,
@ -17,12 +17,12 @@ import {
deleteEntityAction,
ReducerStreamApiAction,
} from '../../common/api/reducer_stream';
import { API_ENDPOINT } from '../../common/api';
import { RESPONSE_STREAM_API_ENDPOINT } from '../../common/api';
export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => {
router.versioned
.post({
path: API_ENDPOINT.REDUCER_STREAM,
path: RESPONSE_STREAM_API_ENDPOINT.REDUCER_STREAM,
access: 'internal',
})
.addVersion(

View file

@ -7,10 +7,10 @@
*/
import type { IRouter, Logger } from '@kbn/core/server';
import { streamFactory } from '@kbn/aiops-utils';
import { streamFactory } from '@kbn/ml-response-stream/server';
import { simpleStringStreamRequestBodySchema } from '../../common/api/simple_string_stream';
import { API_ENDPOINT } from '../../common/api';
import { RESPONSE_STREAM_API_ENDPOINT } from '../../common/api';
function timeout(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
@ -19,7 +19,7 @@ function timeout(ms: number) {
export const defineSimpleStringStreamRoute = (router: IRouter, logger: Logger) => {
router.versioned
.post({
path: API_ENDPOINT.SIMPLE_STRING_STREAM,
path: RESPONSE_STREAM_API_ENDPOINT.SIMPLE_STRING_STREAM,
access: 'internal',
})
.addVersion(

View file

@ -19,8 +19,8 @@
"@kbn/developer-examples-plugin",
"@kbn/data-plugin",
"@kbn/kibana-react-plugin",
"@kbn/aiops-utils",
"@kbn/config-schema",
"@kbn/shared-ux-router",
"@kbn/ml-response-stream",
]
}

View file

@ -517,6 +517,7 @@
"@kbn/ml-plugin": "link:x-pack/plugins/ml",
"@kbn/ml-query-utils": "link:x-pack/packages/ml/query_utils",
"@kbn/ml-random-sampler-utils": "link:x-pack/packages/ml/random_sampler_utils",
"@kbn/ml-response-stream": "link:x-pack/packages/ml/response_stream",
"@kbn/ml-route-utils": "link:x-pack/packages/ml/route_utils",
"@kbn/ml-runtime-field-utils": "link:x-pack/packages/ml/runtime_field_utils",
"@kbn/ml-string-hash": "link:x-pack/packages/ml/string_hash",

View file

@ -996,6 +996,8 @@
"@kbn/ml-query-utils/*": ["x-pack/packages/ml/query_utils/*"],
"@kbn/ml-random-sampler-utils": ["x-pack/packages/ml/random_sampler_utils"],
"@kbn/ml-random-sampler-utils/*": ["x-pack/packages/ml/random_sampler_utils/*"],
"@kbn/ml-response-stream": ["x-pack/packages/ml/response_stream"],
"@kbn/ml-response-stream/*": ["x-pack/packages/ml/response_stream/*"],
"@kbn/ml-route-utils": ["x-pack/packages/ml/route_utils"],
"@kbn/ml-route-utils/*": ["x-pack/packages/ml/route_utils/*"],
"@kbn/ml-runtime-field-utils": ["x-pack/packages/ml/runtime_field_utils"],

View file

@ -7,9 +7,3 @@
export { getSnappedWindowParameters, getWindowParameters } from './src/get_window_parameters';
export type { WindowParameters } from './src/get_window_parameters';
export { streamFactory } from './src/stream_factory';
export { useFetchStream } from './src/use_fetch_stream';
export type {
UseFetchStreamCustomReducerParams,
UseFetchStreamParamsDefault,
} from './src/use_fetch_stream';

View file

@ -1,148 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ReducerAction } from 'react';
import { ELASTIC_HTTP_VERSION_HEADER } from '@kbn/core-http-common';
import type { UseFetchStreamParamsDefault } from './use_fetch_stream';
type GeneratorError = string | null;
/**
* Uses `fetch` and `getReader` to receive an API call as a stream with multiple chunks
* as soon as they are available. `fetchStream` is implemented as a generator that will
* yield/emit chunks and can be consumed for example like this:
*
* ```js
* for await (const [error, chunk] of fetchStream(...) {
* ...
* }
* ```
*
* @param endpoint The API endpoint including the Kibana basepath.
* @param apiVersion - The API version to be used.
* @param abortCtrl Abort controller for cancelling the request.
* @param body The request body. For now all requests are POST.
* @param ndjson Boolean flag to receive the stream as a raw string or NDJSON.
* @param bufferBounce A buffer timeout which defaults to 100ms. This collects stream
* chunks for the time of the timeout and only then yields/emits them.
* This is useful so we are more in control of passing on data to
* consuming React components and we won't hammer the DOM with
* updates on every received chunk.
*
* @returns - Yields/emits items in the format [error, value]
* inspired by node's recommended error convention for callbacks.
*/
export async function* fetchStream<I extends UseFetchStreamParamsDefault, BasePath extends string>(
endpoint: `${BasePath}${I['endpoint']}`,
apiVersion: string,
abortCtrl: React.MutableRefObject<AbortController>,
body: I['body'],
ndjson = true,
bufferBounce = 100
): AsyncGenerator<
[GeneratorError, ReducerAction<I['reducer']> | Array<ReducerAction<I['reducer']>> | undefined]
> {
let stream: Response;
try {
stream = await fetch(endpoint, {
signal: abortCtrl.current.signal,
method: 'POST',
headers: {
// This refers to the format of the request body,
// not the response, which will be a uint8array Buffer.
'Content-Type': 'application/json',
[ELASTIC_HTTP_VERSION_HEADER]: apiVersion,
'kbn-xsrf': 'stream',
},
...(Object.keys(body).length > 0 ? { body: JSON.stringify(body) } : {}),
});
} catch (error) {
yield [error.toString(), undefined];
return;
}
if (!stream.ok) {
yield [`Error ${stream.status}: ${stream.statusText}`, undefined];
return;
}
if (stream.body !== null) {
// Note that Firefox 99 doesn't support `TextDecoderStream` yet.
// That's why we skip it here and use `TextDecoder` later to decode each chunk.
// Once Firefox supports it, we can use the following alternative:
// const reader = stream.body.pipeThrough(new TextDecoderStream()).getReader();
const reader = stream.body.getReader();
let partial = '';
let actionBuffer: Array<ReducerAction<I['reducer']>> = [];
let lastCall = 0;
while (true) {
try {
const { value: uint8array, done } = await reader.read();
if (done) break;
const value = new TextDecoder().decode(uint8array);
const full = `${partial}${value}`;
const parts = ndjson ? full.split('\n') : [full];
const last = ndjson ? parts.pop() : '';
partial = last ?? '';
const actions = (ndjson ? parts.map((p) => JSON.parse(p)) : parts) as Array<
ReducerAction<I['reducer']>
>;
actionBuffer.push(...actions);
const now = Date.now();
if (now - lastCall >= bufferBounce && actionBuffer.length > 0) {
yield [null, actionBuffer];
actionBuffer = [];
lastCall = now;
// In cases where the next chunk takes longer to be received than the `bufferBounce` timeout,
// we trigger this client side timeout to clear a potential intermediate buffer state.
// Since `yield` cannot be passed on to other scopes like callbacks,
// this pattern using a Promise is used to wait for the timeout.
yield new Promise<
[
GeneratorError,
ReducerAction<I['reducer']> | Array<ReducerAction<I['reducer']>> | undefined
]
>((resolve) => {
setTimeout(() => {
if (actionBuffer.length > 0) {
resolve([null, actionBuffer]);
actionBuffer = [];
lastCall = now;
} else {
resolve([null, []]);
}
}, bufferBounce + 10);
});
}
} catch (error) {
if (error.name !== 'AbortError') {
yield [error.toString(), undefined];
}
break;
}
}
// The stream reader might finish with a partially filled actionBuffer so
// we need to clear it once more after the request is done.
if (actionBuffer.length > 0) {
yield [null, actionBuffer];
actionBuffer.length = 0;
}
}
}

View file

@ -1,235 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
useEffect,
useReducer,
useRef,
useState,
Dispatch,
Reducer,
ReducerAction,
ReducerState,
} from 'react';
import { fetchStream } from './fetch_stream';
import { stringReducer, StringReducer } from './string_reducer';
/**
* Custom hook type definition of the base params for an NDJSON stream with custom reducer.
*
* @export
* @interface UseFetchStreamCustomReducerParams
* @typedef {UseFetchStreamCustomReducerParams}
*/
export interface UseFetchStreamCustomReducerParams {
/**
* API endpoint
* @type {string}
*/
endpoint: string;
/**
* API version
* @type {string}
*/
apiVersion: string;
/**
* Request body
* @type {object}
*/
body: object;
/**
* Reducer function to be applied to response chunks.
* @type {Reducer<any, any>}
*/
reducer: Reducer<any, any>;
}
/**
* Custom hook type definition of the base params for a string base stream without a custom reducer.
*/
export interface UseFetchStreamParamsDefault {
/**
* API endpoint
* @type {string}
*/
endpoint: string;
/**
* API version
* @type {string}
*/
apiVersion: string;
/**
* Request body
* @type {object}
*/
body: object;
/**
* Reducer function to be applied to response chunks.
* @type {StringReducer}
*/
reducer: StringReducer;
}
/**
* The return type of the `useFetchStream` hook.
*
* @interface UseFetchStreamReturnType
* @typedef {UseFetchStreamReturnType}
* @template Data
* @template Action
*/
interface UseFetchStreamReturnType<Data, Action> {
cancel: () => void;
data: Data;
dispatch: Dispatch<Action>;
errors: string[];
isCancelled: boolean;
isRunning: boolean;
start: () => Promise<void>;
}
/**
* This overload allows us to fall back to a simple reducer that
* just acts on a string as the reducer state if no options are supplied.
*
* @export
* @template I
* @template BasePath
* @param {`${I['endpoint']}`} endpoint - API endpoint including Kibana base path.
* @param {I['apiVersion']} apiVersion - API version.
* @param {I['body']} body - API request body.
* @returns {UseFetchStreamReturnType<string, ReducerAction<I['reducer']>>} - An object with streaming data and methods to act on the stream.
*/
export function useFetchStream<I extends UseFetchStreamParamsDefault, BasePath extends string>(
endpoint: `${BasePath}${I['endpoint']}`,
apiVersion: I['apiVersion'],
body: I['body']
): UseFetchStreamReturnType<string, ReducerAction<I['reducer']>>;
/**
* This overload covers passing in options and will use
* a custom reducer with appropriate type support.
*
* @export
* @template I
* @template BasePath
* @param {`${I['endpoint']}`} endpoint - API endpoint including Kibana base path.
* @param {I['apiVersion']} apiVersion - API version.
* @param {I['body']} body - API request body.
* @param {{ reducer: I['reducer']; initialState: ReducerState<I['reducer']> }} options - Custom reducer and initial state.
* @returns {UseFetchStreamReturnType<ReducerState<I['reducer']>, ReducerAction<I['reducer']>>} - An object with streaming data and methods to act on the stream.
*/
export function useFetchStream<
I extends UseFetchStreamCustomReducerParams,
BasePath extends string
>(
endpoint: `${BasePath}${I['endpoint']}`,
apiVersion: I['apiVersion'],
body: I['body'],
options: {
/**
* Custom reducer
* @type {I['reducer']}
*/
reducer: I['reducer'];
/**
* Initial state
* @type {ReducerState<I['reducer']>}
*/
initialState: ReducerState<I['reducer']>;
}
): UseFetchStreamReturnType<ReducerState<I['reducer']>, ReducerAction<I['reducer']>>;
/**
* Custom hook to receive streaming data.
*
* @param endpoint - API endpoint including Kibana base path.
* @param apiVersion - API version.
* @param body - API request body.
* @param options - Optional custom reducer and initial state.
* @returns An object with streaming data and methods to act on the stream.
*/
export function useFetchStream<I extends UseFetchStreamParamsDefault, BasePath extends string>(
endpoint: `${BasePath}${I['endpoint']}`,
apiVersion: string,
body: I['body'],
options?: {
/**
* Custom reducer
* @type {I['reducer']}
*/
reducer: I['reducer'];
/**
* Initial state
* @type {ReducerState<I['reducer']>}
*/
initialState: ReducerState<I['reducer']>;
}
): UseFetchStreamReturnType<ReducerState<I['reducer']>, ReducerAction<I['reducer']>> {
const [errors, setErrors] = useState<string[]>([]);
const [isCancelled, setIsCancelled] = useState(false);
const [isRunning, setIsRunning] = useState(false);
const reducer = (options?.reducer ?? stringReducer) as I['reducer'];
const initialState = (options?.initialState ?? '') as ReducerState<I['reducer']>;
const [data, dispatch] = useReducer(reducer, initialState);
const abortCtrl = useRef(new AbortController());
const addError = (error: string) => {
setErrors((prevErrors) => [...prevErrors, error]);
};
const start = async () => {
if (isRunning) {
addError('Restart not supported yet.');
return;
}
setErrors([]);
setIsRunning(true);
setIsCancelled(false);
abortCtrl.current = new AbortController();
for await (const [fetchStreamError, actions] of fetchStream<
UseFetchStreamCustomReducerParams,
BasePath
>(endpoint, apiVersion, abortCtrl, body, options !== undefined)) {
if (fetchStreamError !== null) {
addError(fetchStreamError);
} else if (actions.length > 0) {
dispatch(actions as ReducerAction<I['reducer']>);
}
}
setIsRunning(false);
};
const cancel = () => {
abortCtrl.current.abort();
setIsCancelled(true);
setIsRunning(false);
};
// If components using this custom hook get unmounted, cancel any ongoing request.
useEffect(() => {
return () => abortCtrl.current.abort();
}, []);
return {
cancel,
data,
dispatch,
errors,
isCancelled,
isRunning,
start,
};
}

View file

@ -14,9 +14,6 @@
"**/*.tsx",
],
"kbn_references": [
"@kbn/logging",
"@kbn/core-http-server",
"@kbn/core-http-common",
"@kbn/ml-is-populated-object",
],
"exclude": [

View file

@ -0,0 +1,11 @@
# @kbn/ml-response-stream
This package provides utilities to create HTTP streaming endpoints.
- Supports optional `gzip` compression.
- Streams can be plain strings or NDJSON.
- The provided custom hook `useFetchStream()` supports debouncing to avoid flooding the DOM with lots of small incremental updates. The hook also takes care of handling potential partial chunks.
The package does not expose `index.ts` at its root, instead there's a `client` and `server` directory you should deep-import from.
For more details and examples on how to use the package please refer to `examples/response_stream/README.md`.

View file

@ -0,0 +1,122 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import startsWith from 'lodash/startsWith';
import type { Reducer, ReducerAction } from 'react';
import type { HttpSetup } from '@kbn/core/public';
type GeneratorError = string | null;
/**
* Uses `fetch` and `getReader` to receive an API call as a stream with multiple chunks
* as soon as they are available. `fetchStream` is implemented as a generator that will
* yield/emit chunks and can be consumed for example like this:
*
* ```js
* for await (const [error, chunk] of fetchStream(...) {
* ...
* }
* ```
*
* Note on the use of `any`:
* The generic `R` extends from `Reducer<any, any>`
* to match the definition in React itself.
*
* @param endpoint The API endpoint including the Kibana basepath.
* @param apiVersion - Optional API version to be used.
* @param abortCtrl Abort controller for cancelling the request.
* @param body The request body. For now all requests are POST.
* @param ndjson Boolean flag to receive the stream as a raw string or NDJSON.
*
* @returns - Yields/emits items in the format [error, value]
* inspired by node's recommended error convention for callbacks.
*/
export async function* fetchStream<B extends object, R extends Reducer<any, any>>(
http: HttpSetup,
endpoint: string,
apiVersion: string | undefined,
abortCtrl: React.MutableRefObject<AbortController>,
body?: B,
ndjson = true
): AsyncGenerator<[GeneratorError, ReducerAction<R> | Array<ReducerAction<R>> | undefined]> {
let stream: Readonly<Response> | undefined;
try {
const response = await http.post(endpoint, {
signal: abortCtrl.current.signal,
version: apiVersion,
asResponse: true,
rawResponse: true,
...(body && Object.keys(body).length > 0 ? { body: JSON.stringify(body) } : {}),
});
stream = response.response;
} catch (error) {
yield [error.toString(), undefined];
return;
}
if (!stream) {
yield [`Error: Response was undefined`, undefined];
return;
}
if (!stream.ok) {
yield [`Error ${stream.status}: ${stream.statusText}`, undefined];
return;
}
if (stream.body !== null) {
// Note that Firefox 99 doesn't support `TextDecoderStream` yet.
// That's why we skip it here and use `TextDecoder` later to decode each chunk.
// Once Firefox supports it, we can use the following alternative:
// const reader = stream.body.pipeThrough(new TextDecoderStream()).getReader();
const reader = stream.body.getReader();
let partial = '';
while (true) {
try {
const { value: uint8array, done } = await reader.read();
if (done) break;
const value = new TextDecoder().decode(uint8array);
const full = `${partial}${value}`;
const parts = ndjson ? full.split('\n') : [full];
const last = ndjson ? parts.pop() : '';
partial = last ?? '';
const actions = (
ndjson
? parts
.map((p) => {
// Check if the response is an `event: ` or `data: ` prefixed SSE event.
// Note this is a workaround, we don't have actual support for SSE events yet.
if (p === '' || startsWith(p, 'event: ') || p === 'data: [DONE]') {
return '[IGNORE]';
} else if (startsWith(p, 'data: ')) {
return JSON.parse(p.split('data: ')[1]);
}
return JSON.parse(p);
})
.filter((p) => p !== '[IGNORE]')
: parts
) as Array<ReducerAction<R>>;
yield [null, actions];
} catch (error) {
if (error.name !== 'AbortError') {
yield [error.toString(), undefined];
}
break;
}
}
}
}

View file

@ -0,0 +1,8 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { useFetchStream } from './use_fetch_stream';

View file

@ -0,0 +1,139 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
useEffect,
useReducer,
useRef,
useState,
type Reducer,
type ReducerAction,
type ReducerState,
} from 'react';
import useThrottle from 'react-use/lib/useThrottle';
import type { HttpSetup } from '@kbn/core/public';
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
import { fetchStream } from './fetch_stream';
import { stringReducer, type StringReducer } from './string_reducer';
// This pattern with a dual ternary allows us to default to StringReducer
// and if a custom reducer is supplied fall back to that one instead.
// The complexity in here allows us to create a simpler API surface where
// these generics can be infered from the arguments and don't have to be
// supplied additionally. Note on the use of `any`: `Reducer<any, any>`
// is used to match the type definition in React itself.
type CustomReducer<T> = T extends StringReducer
? StringReducer
: T extends Reducer<any, any>
? T
: never;
// Wrapped reducer options in the format they need to be passed in as arguments.
interface FetchStreamCustomReducer<T> {
reducer: CustomReducer<T>;
initialState: ReducerState<CustomReducer<T>>;
}
// Type guard for custom reducer hook argument
function isReducerOptions<T>(arg: unknown): arg is CustomReducer<T> {
return isPopulatedObject(arg, ['reducer', 'initialState']);
}
/**
* Custom hook to receive streaming data.
*
* Note on the use of `any`:
* The generic `R` extends from `Reducer<any, any>`
* to match the definition in React itself.
*
* @param http Kibana HTTP client.
* @param endpoint API endpoint including Kibana base path.
* @param apiVersion Optional API version.
* @param body Optional API request body.
* @param customReducer Optional custom reducer and initial state.
* @returns An object with streaming data and methods to act on the stream.
*/
export function useFetchStream<B extends object, R extends Reducer<any, any>>(
http: HttpSetup,
endpoint: string,
apiVersion?: string,
body?: B,
customReducer?: FetchStreamCustomReducer<R>
) {
const [errors, setErrors] = useState<string[]>([]);
const [isCancelled, setIsCancelled] = useState(false);
const [isRunning, setIsRunning] = useState(false);
const reducerWithFallback = isReducerOptions(customReducer)
? customReducer
: ({ reducer: stringReducer, initialState: '' } as FetchStreamCustomReducer<R>);
const [data, dispatch] = useReducer(
reducerWithFallback.reducer,
reducerWithFallback.initialState
);
const dataThrottled = useThrottle(data, 100);
const abortCtrl = useRef(new AbortController());
const addError = (error: string) => {
setErrors((prevErrors) => [...prevErrors, error]);
};
const start = async () => {
if (isRunning) {
addError('Instant restart while running not supported yet.');
return;
}
setErrors([]);
setIsRunning(true);
setIsCancelled(false);
abortCtrl.current = new AbortController();
for await (const [fetchStreamError, actions] of fetchStream<B, CustomReducer<R>>(
http,
endpoint,
apiVersion,
abortCtrl,
body,
customReducer !== undefined
)) {
if (fetchStreamError !== null) {
addError(fetchStreamError);
} else if (Array.isArray(actions) && actions.length > 0) {
dispatch(actions as ReducerAction<CustomReducer<R>>);
}
}
setIsRunning(false);
};
const cancel = () => {
abortCtrl.current.abort();
setIsCancelled(true);
setIsRunning(false);
};
// If components using this custom hook get unmounted, cancel any ongoing request.
useEffect(() => {
return () => abortCtrl.current.abort();
}, []);
return {
cancel,
data: dataThrottled,
dispatch,
errors,
isCancelled,
isRunning,
start,
};
}

View file

@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
module.exports = {
preset: '@kbn/test',
rootDir: '../../../..',
roots: ['<rootDir>/x-pack/packages/ml/response_stream'],
};

View file

@ -0,0 +1,5 @@
{
"type": "shared-common",
"id": "@kbn/ml-response-stream",
"owner": "@elastic/ml-ui"
}

View file

@ -0,0 +1,6 @@
{
"name": "@kbn/ml-response-stream",
"private": true,
"version": "1.0.0",
"license": "Elastic License 2.0"
}

View file

@ -0,0 +1,8 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { streamFactory } from './stream_factory';

View file

@ -56,7 +56,7 @@ export function streamFactory<T = string>(
): StreamFactoryReturnType<T>;
/**
* Sets up a response stream with support for gzip compression depending on provided
* request headers. Any non-string data pushed to the stream will be stream as NDJSON.
* request headers. Any non-string data pushed to the stream will be streamed as NDJSON.
*
* @param headers - Request headers.
* @param logger - Kibana logger.

View file

@ -0,0 +1,24 @@
{
"extends": "../../../../tsconfig.base.json",
"compilerOptions": {
"outDir": "target/types",
"types": [
"jest",
"node",
"react"
]
},
"include": [
"**/*.ts",
"**/*.tsx",
],
"exclude": [
"target/**/*"
],
"kbn_references": [
"@kbn/core",
"@kbn/core-http-server",
"@kbn/logging",
"@kbn/ml-is-populated-object",
]
}

View file

@ -5,6 +5,8 @@
* 2.0.
*/
import type { HttpSetup } from '@kbn/core/public';
import type {
AiopsLogRateAnalysisSchema,
AiopsLogRateAnalysisApiAction,
@ -19,6 +21,7 @@ type AiopsApiEndpointKeys = keyof typeof AIOPS_API_ENDPOINT;
export type AiopsApiEndpoint = typeof AIOPS_API_ENDPOINT[AiopsApiEndpointKeys];
export interface AiopsApiLogRateAnalysis {
http: HttpSetup;
endpoint: AiopsApiEndpoint;
apiVersion: string;
reducer: typeof streamReducer;

View file

@ -23,7 +23,7 @@ import {
import type { DataView } from '@kbn/data-views-plugin/public';
import { ProgressControls } from '@kbn/aiops-components';
import { useFetchStream } from '@kbn/aiops-utils';
import { useFetchStream } from '@kbn/ml-response-stream/client';
import type { WindowParameters } from '@kbn/aiops-utils';
import { i18n } from '@kbn/i18n';
import { FormattedMessage } from '@kbn/i18n-react';
@ -117,7 +117,6 @@ export const LogRateAnalysisResults: FC<LogRateAnalysisResultsProps> = ({
onAnalysisCompleted,
}) => {
const { http } = useAiopsAppContext();
const basePath = http.basePath.get() ?? '';
const { clearAllRowState } = useLogRateAnalysisResultsTableRowContext();
@ -158,8 +157,9 @@ export const LogRateAnalysisResults: FC<LogRateAnalysisResultsProps> = ({
data,
isRunning,
errors: streamErrors,
} = useFetchStream<AiopsApiLogRateAnalysis, typeof basePath>(
`${basePath}/internal/aiops/log_rate_analysis`,
} = useFetchStream(
http,
'/internal/aiops/log_rate_analysis',
'1',
{
start: earliest,

View file

@ -14,7 +14,7 @@ import type { CoreStart, IRouter } from '@kbn/core/server';
import { KBN_FIELD_TYPES } from '@kbn/field-types';
import type { Logger } from '@kbn/logging';
import type { DataRequestHandlerContext } from '@kbn/data-plugin/server';
import { streamFactory } from '@kbn/aiops-utils';
import { streamFactory } from '@kbn/ml-response-stream/server';
import type {
SignificantTerm,
SignificantTermGroup,

View file

@ -53,6 +53,7 @@
"@kbn/utility-types",
"@kbn/ml-kibana-theme",
"@kbn/unified-field-list",
"@kbn/ml-response-stream",
],
"exclude": [
"target/**/*",

View file

@ -4726,6 +4726,10 @@
version "0.0.0"
uid ""
"@kbn/ml-response-stream@link:x-pack/packages/ml/response_stream":
version "0.0.0"
uid ""
"@kbn/ml-route-utils@link:x-pack/packages/ml/route_utils":
version "0.0.0"
uid ""