[ML] Explain log rate spikes: Move API stream demos to Kibana examples. (#132590)

This creates a response_stream plugin in the Kibana /examples section. The plugin demonstrates API endpoints that can stream data chunks with a single request with gzip/compression support. gzip-streams get decompressed natively by browsers. The plugin demonstrates two use cases to get started: Streaming a raw string as well as a more complex example that streams Redux-like actions to the client which update React state via useReducer().
This commit is contained in:
Walter Rafelsberger 2022-05-24 16:59:31 +02:00 committed by GitHub
parent c9b1832654
commit c968e508f6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
69 changed files with 1706 additions and 738 deletions

4
.github/CODEOWNERS vendored
View file

@ -187,7 +187,7 @@
/x-pack/test/screenshot_creation/apps/ml_docs @elastic/ml-ui
/x-pack/test/screenshot_creation/services/ml_screenshots.ts @elastic/ml-ui
# Additional plugins maintained by the ML team.
# Additional plugins and packages maintained by the ML team.
/x-pack/plugins/aiops/ @elastic/ml-ui
/x-pack/plugins/data_visualizer/ @elastic/ml-ui
/x-pack/plugins/file_upload/ @elastic/ml-ui
@ -198,6 +198,8 @@
/x-pack/test/functional/apps/transform/ @elastic/ml-ui
/x-pack/test/functional/services/transform/ @elastic/ml-ui
/x-pack/test/functional_basic/apps/transform/ @elastic/ml-ui
/packages/kbn-aiops-utils @elastic/ml-ui
/examples/response_stream/ @elastic/ml-ui
# Maps
#CC# /x-pack/plugins/maps/ @elastic/kibana-gis

View file

@ -0,0 +1,28 @@
## response stream
This plugin demonstrates how to stream chunks of data to the client with just a single request.
To run Kibana with the described examples, use `yarn start --run-examples`.
The `response_stream` plugin demonstrates API endpoints that can stream data chunks with a single request with gzip/compression support. gzip-streams get decompressed natively by browsers. The plugin demonstrates two use cases to get started: Streaming a raw string as well as a more complex example that streams Redux-like actions to the client which update React state via `useReducer()`.
Code in `@kbn/aiops-utils` contains helpers to set up a stream on the server side (`streamFactory()`) and consume it on the client side via a custom hook (`useFetchStream()`). The utilities make use of TS generics in a way that allows to have type safety for both request related options as well as the returned data.
No additional third party libraries are used in the helpers to make it work. On the server, they integrate with `Hapi` and use node's own `gzip`. On the client, the custom hook abstracts away the necessary logic to consume the stream, internally it makes use of a generator function and `useReducer()` to update React state.
On the server, the simpler stream to send a string is set up like this:
```ts
const { end, push, responseWithHeaders } = streamFactory(request.headers);
```
The request's headers get passed on to automatically identify if compression is supported by the client.
On the client, the custom hook is used like this:
```ts
const { error, start, cancel, data, isRunning } = useFetchStream<
ApiSimpleStringStream, typeof basePath
>(`${basePath}/internal/response_stream/simple_string_stream`);
```

View file

@ -0,0 +1 @@
The `./api` folder contains shared code used to support working with the same type specifications on server and client.

View file

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type {
UseFetchStreamCustomReducerParams,
UseFetchStreamParamsDefault,
} from '@kbn/aiops-utils';
import {
reducerStreamReducer,
ReducerStreamRequestBodySchema,
ReducerStreamApiAction,
} from './reducer_stream';
import { SimpleStringStreamRequestBodySchema } from './simple_string_stream';
export const API_ENDPOINT = {
REDUCER_STREAM: '/internal/response_stream/reducer_stream',
SIMPLE_STRING_STREAM: '/internal/response_stream/simple_string_stream',
} as const;
export interface ApiReducerStream extends UseFetchStreamCustomReducerParams {
endpoint: typeof API_ENDPOINT.REDUCER_STREAM;
reducer: typeof reducerStreamReducer;
body: ReducerStreamRequestBodySchema;
actions: ReducerStreamApiAction;
}
export interface ApiSimpleStringStream extends UseFetchStreamParamsDefault {
endpoint: typeof API_ENDPOINT.SIMPLE_STRING_STREAM;
body: SimpleStringStreamRequestBodySchema;
}

View file

@ -1,20 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { schema, TypeOf } from '@kbn/config-schema';
export const aiopsExampleStreamSchema = schema.object({
/** Boolean flag to enable/disabling simulation of response errors. */
simulateErrors: schema.maybe(schema.boolean()),
/** Maximum timeout between streaming messages. */
timeout: schema.maybe(schema.number()),
});
export type AiopsExampleStreamSchema = TypeOf<typeof aiopsExampleStreamSchema>;
export { reducerStreamReducer } from './reducer';
export { reducerStreamRequestBodySchema } from './request_body_schema';
export type { ReducerStreamRequestBodySchema } from './request_body_schema';
export const API_ACTION_NAME = {
UPDATE_PROGRESS: 'update_progress',
@ -65,7 +59,7 @@ export function deleteEntityAction(payload: string): ApiActionDeleteEntity {
};
}
export type AiopsExampleStreamApiAction =
export type ReducerStreamApiAction =
| ApiActionUpdateProgress
| ApiActionAddToEntity
| ApiActionDeleteEntity;

View file

@ -1,33 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { AiopsExampleStreamApiAction, API_ACTION_NAME } from '../../../common/api/example_stream';
import { ReducerStreamApiAction, API_ACTION_NAME } from '.';
export const UI_ACTION_NAME = {
ERROR: 'error',
RESET: 'reset',
} as const;
export type UiActionName = typeof UI_ACTION_NAME[keyof typeof UI_ACTION_NAME];
export interface StreamState {
errors: string[];
progress: number;
entities: Record<string, number>;
}
export const initialState: StreamState = {
errors: [],
progress: 0,
entities: {},
};
interface UiActionError {
type: typeof UI_ACTION_NAME.ERROR;
payload: string;
}
interface UiActionResetStream {
type: typeof UI_ACTION_NAME.RESET;
}
@ -36,14 +30,14 @@ export function resetStream(): UiActionResetStream {
return { type: UI_ACTION_NAME.RESET };
}
type UiAction = UiActionResetStream | UiActionError;
export type ReducerAction = AiopsExampleStreamApiAction | UiAction;
export function streamReducer(
type UiAction = UiActionResetStream;
export type ReducerAction = ReducerStreamApiAction | UiAction;
export function reducerStreamReducer(
state: StreamState,
action: ReducerAction | ReducerAction[]
): StreamState {
if (Array.isArray(action)) {
return action.reduce(streamReducer, state);
return action.reduce(reducerStreamReducer, state);
}
switch (action.type) {
@ -72,15 +66,7 @@ export function streamReducer(
};
case UI_ACTION_NAME.RESET:
return initialState;
case UI_ACTION_NAME.ERROR:
return {
...state,
errors: [...state.errors, action.payload],
};
default:
return {
...state,
errors: [...state.errors, 'UNKNOWN_ACTION_ERROR'],
};
return state;
}
}

View file

@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { schema, TypeOf } from '@kbn/config-schema';
export const reducerStreamRequestBodySchema = schema.object({
/** Boolean flag to enable/disabling simulation of response errors. */
simulateErrors: schema.maybe(schema.boolean()),
/** Maximum timeout between streaming messages. */
timeout: schema.maybe(schema.number()),
});
export type ReducerStreamRequestBodySchema = TypeOf<typeof reducerStreamRequestBodySchema>;

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export { simpleStringStreamRequestBodySchema } from './request_body_schema';
export type { SimpleStringStreamRequestBodySchema } from './request_body_schema';

View file

@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { schema, TypeOf } from '@kbn/config-schema';
export const simpleStringStreamRequestBodySchema = schema.object({
/** Maximum timeout between streaming messages. */
timeout: schema.number(),
});
export type SimpleStringStreamRequestBodySchema = TypeOf<
typeof simpleStringStreamRequestBodySchema
>;

View file

@ -0,0 +1,14 @@
{
"id": "responseStream",
"kibanaVersion": "kibana",
"version": "0.0.1",
"server": true,
"ui": true,
"owner": {
"name": "ML UI",
"githubTeam": "ml-ui"
},
"requiredPlugins": ["developerExamples"],
"optionalPlugins": [],
"requiredBundles": ["kibanaReact"]
}

View file

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import * as React from 'react';
import {
EuiPageBody,
EuiPageContent,
EuiPageContentBody,
EuiPageHeader,
EuiPageHeaderSection,
EuiTitle,
} from '@elastic/eui';
export interface PageProps {
title?: React.ReactNode;
}
export const Page: React.FC<PageProps> = ({ title = 'Untitled', children }) => {
return (
<EuiPageBody>
<EuiPageHeader>
<EuiPageHeaderSection>
<EuiTitle size="l">
<h1>{title}</h1>
</EuiTitle>
</EuiPageHeaderSection>
</EuiPageHeader>
<EuiPageContent>
<EuiPageContentBody style={{ maxWidth: 800, margin: '0 auto' }}>
{children}
</EuiPageContentBody>
</EuiPageContent>
</EuiPageBody>
);
};

View file

@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import React from 'react';
import { BrowserRouter as Router, Route, Redirect, Switch } from 'react-router-dom';
import { EuiPage } from '@elastic/eui';
import { useDeps } from '../../hooks/use_deps';
import { Sidebar } from './sidebar';
import { routes } from '../../routes';
export const App: React.FC = () => {
const { appBasePath } = useDeps();
const routeElements: React.ReactElement[] = [];
for (const { items } of routes) {
for (const { id, component } of items) {
routeElements.push(<Route key={id} path={`/${id}`} render={(props) => component} />);
}
}
return (
<Router basename={appBasePath}>
<EuiPage>
<Sidebar />
<Switch>
{routeElements}
<Redirect to="/simple-string-stream" />
</Switch>
</EuiPage>
</Router>
);
};

View file

@ -1,8 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export function getStatusMessage(isRunning: boolean, isCancelled: boolean, progress: number) {
@ -13,7 +14,7 @@ export function getStatusMessage(isRunning: boolean, isCancelled: boolean, progr
} else if (!isRunning && isCancelled) {
return 'Oh no, development got cancelled!';
} else if (!isRunning && progress === 100) {
return 'Development clompeted, the release got out the door!';
return 'Development completed, the release got out the door!';
}
// When the process stops but wasn't cancelled by the user and progress is not yet at 100%,

View file

@ -0,0 +1,140 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import React, { useEffect, useState, FC } from 'react';
import { Chart, Settings, Axis, BarSeries, Position, ScaleType } from '@elastic/charts';
import {
EuiBadge,
EuiButton,
EuiCheckbox,
EuiFlexGroup,
EuiFlexItem,
EuiProgress,
EuiSpacer,
EuiText,
} from '@elastic/eui';
import { useFetchStream } from '@kbn/aiops-utils';
import { ApiReducerStream } from '../../../../../common/api';
import {
initialState,
resetStream,
reducerStreamReducer,
} from '../../../../../common/api/reducer_stream/reducer';
import { Page } from '../../../../components/page';
import { useDeps } from '../../../../hooks/use_deps';
import { getStatusMessage } from './get_status_message';
export const PageReducerStream: FC = () => {
const {
core: { http, notifications },
} = useDeps();
const basePath = http?.basePath.get() ?? '';
const [simulateErrors, setSimulateErrors] = useState(false);
const { dispatch, start, cancel, data, error, isCancelled, isRunning } = useFetchStream<
ApiReducerStream,
typeof basePath
>(
`${basePath}/internal/response_stream/reducer_stream`,
{ simulateErrors },
{ reducer: reducerStreamReducer, initialState }
);
const { progress, entities } = data;
const onClickHandler = async () => {
if (isRunning) {
cancel();
} else {
dispatch(resetStream());
start();
}
};
useEffect(() => {
if (error) {
notifications.toasts.addDanger(error);
}
}, [error, notifications.toasts]);
const buttonLabel = isRunning ? 'Stop development' : 'Start development';
return (
<Page title={'Reducer stream'}>
<EuiText>
<p>
This demonstrates a single endpoint with streaming support that sends Redux inspired
actions from server to client. The server and client share types of the data to be
received. The client uses a custom hook that receives stream chunks and passes them on to
`useReducer()` that acts on the Redux type actions it receives. The custom hook includes
code to buffer actions and is able to apply them in bulk so the DOM does not get hammered
with updates. Hit &quot;Start development&quot; to trigger the bar chart race!
</p>
</EuiText>
<br />
<EuiFlexGroup alignItems="center">
<EuiFlexItem grow={false}>
<EuiButton type="primary" size="s" onClick={onClickHandler} aria-label={buttonLabel}>
{buttonLabel}
</EuiButton>
</EuiFlexItem>
<EuiFlexItem grow={false}>
<EuiText>
<EuiBadge>{progress}%</EuiBadge>
</EuiText>
</EuiFlexItem>
<EuiFlexItem>
<EuiProgress value={progress} max={100} size="xs" />
</EuiFlexItem>
</EuiFlexGroup>
<EuiSpacer />
<div style={{ height: '300px' }}>
<Chart>
<Settings rotation={90} />
<Axis id="entities" position={Position.Bottom} title="Commits" showOverlappingTicks />
<Axis id="left2" title="Developers" position={Position.Left} />
<BarSeries
id="commits"
xScaleType={ScaleType.Linear}
yScaleType={ScaleType.Linear}
xAccessor="x"
yAccessors={['y']}
data={Object.entries(entities)
.map(([x, y]) => {
return {
x,
y,
};
})
.sort((a, b) => b.y - a.y)}
/>
</Chart>
</div>
<EuiText>
<p>{getStatusMessage(isRunning, isCancelled, data.progress)}</p>
<EuiCheckbox
id="responseStreamSimulateErrorsCheckbox"
label="Simulate errors (gets applied to new streams only, not currently running ones)."
checked={simulateErrors}
onChange={(e) => setSimulateErrors(!simulateErrors)}
compressed
/>
</EuiText>
</Page>
);
};

View file

@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import React, { FC } from 'react';
import { EuiButton, EuiCallOut, EuiFlexGroup, EuiFlexItem, EuiSpacer, EuiText } from '@elastic/eui';
import { useFetchStream } from '@kbn/aiops-utils';
import { ApiSimpleStringStream } from '../../../../../common/api';
import { useDeps } from '../../../../hooks/use_deps';
import { Page } from '../../../../components/page';
export const PageSimpleStringStream: FC = () => {
const { core } = useDeps();
const basePath = core.http?.basePath.get() ?? '';
const { dispatch, error, start, cancel, data, isRunning } = useFetchStream<
ApiSimpleStringStream,
typeof basePath
>(`${basePath}/internal/response_stream/simple_string_stream`, { timeout: 500 });
const onClickHandler = async () => {
if (isRunning) {
cancel();
} else {
// Passing in undefined will reset `data` to an empty string.
dispatch(undefined);
start();
}
};
const buttonLabel = isRunning ? 'Stop' : 'Start';
return (
<Page title="Simple string stream">
<EuiText>
<p>
This demonstrates a single endpoint with streaming support that sends just chunks of a
string from server to client. The client uses a custom hook that receives stream chunks
and passes them on to `useReducer()` that acts on the string chunks it receives. The
custom hook includes code to buffer chunks and is able to apply them in bulk so the DOM
does not get hammered with updates. Hit &quot;Start&quot; to trigger the stream!
</p>
</EuiText>
<br />
<EuiFlexGroup alignItems="center">
<EuiFlexItem grow={false}>
<EuiButton type="primary" size="s" onClick={onClickHandler} aria-label={buttonLabel}>
{buttonLabel}
</EuiButton>
</EuiFlexItem>
</EuiFlexGroup>
<EuiSpacer />
<EuiText>
<p>{data}</p>
</EuiText>
{error && (
<EuiCallOut title="Sorry, there was an error" color="danger" iconType="alert">
<p>{error}</p>
</EuiCallOut>
)}
</Page>
);
};

View file

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import React from 'react';
import { EuiPageSideBar, EuiSideNav } from '@elastic/eui';
import { useHistory } from 'react-router-dom';
import { routes } from '../../routes';
export const Sidebar: React.FC = () => {
const history = useHistory();
return (
<EuiPageSideBar>
<EuiSideNav
items={routes.map(({ id, title, items }) => ({
id,
name: title,
isSelected: true,
items: items.map((route) => ({
id: route.id,
name: route.title,
onClick: () => history.push(`/${route.id}`),
'data-test-subj': route.id,
})),
}))}
/>
</EuiPageSideBar>
);
};

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { useKibana } from '@kbn/kibana-react-plugin/public';
import type { ResponseStreamDeps } from '../mount';
export const useDeps = () => useKibana().services as ResponseStreamDeps;

View file

@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { ResponseStreamPlugin } from './plugin';
export const plugin = () => new ResponseStreamPlugin();

View file

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import * as React from 'react';
import { render, unmountComponentAtNode } from 'react-dom';
import { CoreSetup, CoreStart, AppMountParameters } from '@kbn/core/public';
import { KibanaContextProvider } from '@kbn/kibana-react-plugin/public';
import { ResponseStreamStartPlugins } from './plugin';
import { App } from './containers/app';
export interface ResponseStreamDeps {
appBasePath: string;
core: CoreStart;
plugins: ResponseStreamStartPlugins;
}
export const mount =
(coreSetup: CoreSetup<ResponseStreamStartPlugins>) =>
async ({ appBasePath, element }: AppMountParameters) => {
const [core, plugins] = await coreSetup.getStartServices();
const deps: ResponseStreamDeps = { appBasePath, core, plugins };
const reactElement = (
<KibanaContextProvider services={deps}>
<App />
</KibanaContextProvider>
);
render(reactElement, element);
return () => unmountComponentAtNode(element);
};

View file

@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Plugin, CoreSetup, AppNavLinkStatus } from '@kbn/core/public';
import { DeveloperExamplesSetup } from '@kbn/developer-examples-plugin/public';
import { mount } from './mount';
export interface ResponseStreamSetupPlugins {
developerExamples: DeveloperExamplesSetup;
}
// eslint-disable-next-line
export interface ResponseStreamStartPlugins {}
export class ResponseStreamPlugin implements Plugin {
public setup(
core: CoreSetup<ResponseStreamStartPlugins, void>,
{ developerExamples }: ResponseStreamSetupPlugins
) {
core.application.register({
id: 'response-stream',
title: 'response stream',
navLinkStatus: AppNavLinkStatus.hidden,
mount: mount(core),
});
developerExamples.register({
appId: 'response-stream',
title: 'response stream',
description:
'This example demonstrates how to stream chunks of data to the client with just a single request.',
links: [
{
label: 'README',
href: 'https://github.com/elastic/kibana/blob/main/examples/response_stream/README.md',
iconType: 'logoGithub',
size: 's',
target: '_blank',
},
],
});
}
public start() {}
public stop() {}
}

View file

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import React from 'react';
import { PageSimpleStringStream } from './containers/app/pages/page_simple_string_stream';
import { PageReducerStream } from './containers/app/pages/page_reducer_stream';
interface RouteSectionDef {
title: string;
id: string;
items: RouteDef[];
}
interface RouteDef {
title: string;
id: string;
component: React.ReactNode;
}
export const routes: RouteSectionDef[] = [
{
title: 'response stream',
id: 'responseStream',
items: [
{
title: 'Simple string stream',
id: 'simple-string-stream',
component: <PageSimpleStringStream />,
},
{
title: 'Reducer stream',
id: 'reducer-stream',
component: <PageReducerStream />,
},
],
},
];

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { PluginInitializerContext } from '@kbn/core/server';
import { ResponseStreamPlugin } from './plugin';
export function plugin(initializerContext: PluginInitializerContext) {
return new ResponseStreamPlugin(initializerContext);
}

View file

@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Plugin, PluginInitializerContext, CoreSetup, CoreStart, Logger } from '@kbn/core/server';
import type { DataRequestHandlerContext } from '@kbn/data-plugin/server';
import { defineReducerStreamRoute, defineSimpleStringStreamRoute } from './routes';
// eslint-disable-next-line
export interface ResponseStreamSetupPlugins {}
// eslint-disable-next-line
export interface ResponseStreamStartPlugins {}
export class ResponseStreamPlugin implements Plugin {
private readonly logger: Logger;
constructor(initializerContext: PluginInitializerContext) {
this.logger = initializerContext.logger.get();
}
public setup(core: CoreSetup, plugins: ResponseStreamSetupPlugins) {
const router = core.http.createRouter<DataRequestHandlerContext>();
core.getStartServices().then(([_, depsStart]) => {
defineReducerStreamRoute(router, this.logger);
defineSimpleStringStreamRoute(router, this.logger);
});
}
public start(core: CoreStart, plugins: ResponseStreamStartPlugins) {}
public stop() {}
}

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export { defineReducerStreamRoute } from './reducer_stream';
export { defineSimpleStringStreamRoute } from './single_string_stream';

View file

@ -1,28 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { IRouter, Logger } from '@kbn/core/server';
import { streamFactory } from '@kbn/aiops-utils';
import {
aiopsExampleStreamSchema,
reducerStreamRequestBodySchema,
updateProgressAction,
addToEntityAction,
deleteEntityAction,
} from '../../common/api/example_stream';
ReducerStreamApiAction,
} from '../../common/api/reducer_stream';
import { API_ENDPOINT } from '../../common/api';
import { streamFactory } from '../lib/stream_factory';
export const defineExampleStreamRoute = (router: IRouter, logger: Logger) => {
export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => {
router.post(
{
path: API_ENDPOINT.EXAMPLE_STREAM,
path: API_ENDPOINT.REDUCER_STREAM,
validate: {
body: aiopsExampleStreamSchema,
body: reducerStreamRequestBodySchema,
},
},
async (context, request, response) => {
@ -37,9 +38,9 @@ export const defineExampleStreamRoute = (router: IRouter, logger: Logger) => {
shouldStop = true;
});
const { DELIMITER, end, push, responseWithHeaders, stream } = streamFactory<
typeof API_ENDPOINT.EXAMPLE_STREAM
>(logger, request.headers);
const { end, error, push, responseWithHeaders } = streamFactory<ReducerStreamApiAction>(
request.headers
);
const entities = [
'kimchy',
@ -55,9 +56,8 @@ export const defineExampleStreamRoute = (router: IRouter, logger: Logger) => {
const actions = [...Array(19).fill('add'), 'delete'];
if (simulateError) {
actions.push('server-only-error');
actions.push('server-to-client-error');
actions.push('client-error');
actions.push('throw-error');
actions.push('emit-error');
}
let progress = 0;
@ -82,20 +82,20 @@ export const defineExampleStreamRoute = (router: IRouter, logger: Logger) => {
push(addToEntityAction(randomEntity, randomCommits));
} else if (randomAction === 'delete') {
push(deleteEntityAction(randomEntity));
} else if (randomAction === 'server-to-client-error') {
} else if (randomAction === 'throw-error') {
// Throw an error. It should not crash Kibana!
// It should be caught, logged and passed on as a stream error.
throw new Error('There was a (simulated) server side error!');
} else if (randomAction === 'client-error') {
// Return not properly encoded JSON to the client.
stream.push(`{body:'Not valid JSON${DELIMITER}`);
} else if (randomAction === 'emit-error') {
// Directly emit an error to the stream, this will not be logged.
error('Error pushed to the stream');
return;
}
pushStreamUpdate();
} catch (error) {
stream.push(
`${JSON.stringify({ type: 'error', payload: error.toString() })}${DELIMITER}`
);
end();
} catch (e) {
logger.error(e);
error(e);
}
}, Math.floor(Math.random() * maxTimeoutMs));
}

View file

@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { IRouter, Logger } from '@kbn/core/server';
import { streamFactory } from '@kbn/aiops-utils';
import { simpleStringStreamRequestBodySchema } from '../../common/api/simple_string_stream';
import { API_ENDPOINT } from '../../common/api';
function timeout(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
export const defineSimpleStringStreamRoute = (router: IRouter, logger: Logger) => {
router.post(
{
path: API_ENDPOINT.SIMPLE_STRING_STREAM,
validate: {
body: simpleStringStreamRequestBodySchema,
},
},
async (context, request, response) => {
const maxTimeoutMs = request.body.timeout ?? 250;
let shouldStop = false;
request.events.aborted$.subscribe(() => {
shouldStop = true;
});
request.events.completed$.subscribe(() => {
shouldStop = true;
});
const { end, error, push, responseWithHeaders } = streamFactory(request.headers);
const text =
'Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents. Elasticsearch is developed in Java and is dual-licensed under the source-available Server Side Public License and the Elastic license, while other parts fall under the proprietary (source-available) Elastic License. Official clients are available in Java, .NET (C#), PHP, Python, Apache Groovy, Ruby and many other languages. According to the DB-Engines ranking, Elasticsearch is the most popular enterprise search engine.';
const tokens = text.split(' ');
async function pushStreamUpdate() {
try {
if (shouldStop) {
end();
return;
}
const token = tokens.shift();
if (token !== undefined) {
push(`${token} `);
await timeout(Math.floor(Math.random() * maxTimeoutMs));
if (!shouldStop) {
pushStreamUpdate();
}
} else {
end();
}
} catch (e) {
error(`There was an error: ${e.toString()}`);
}
}
// do not call this using `await` so it will run asynchronously while we return the stream already.
pushStreamUpdate();
return response.ok(responseWithHeaders);
}
);
};

View file

@ -0,0 +1,21 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"outDir": "./target/types",
},
"include": [
"index.ts",
"common/**/*.ts",
"public/**/*.ts",
"public/**/*.tsx",
"server/**/*.ts",
"../../typings/**/*",
],
"exclude": [],
"references": [
{ "path": "../../src/core/tsconfig.json" },
{ "path": "../developer_examples/tsconfig.json" },
{ "path": "../../src/plugins/data/tsconfig.json" },
{ "path": "../../src/plugins/kibana_react/tsconfig.json" },
]
}

View file

@ -131,6 +131,7 @@
"@hapi/inert": "^6.0.4",
"@hapi/wreck": "^17.1.0",
"@kbn/ace": "link:bazel-bin/packages/kbn-ace",
"@kbn/aiops-utils": "link:bazel-bin/packages/kbn-aiops-utils",
"@kbn/alerts": "link:bazel-bin/packages/kbn-alerts",
"@kbn/ambient-storybook-types": "link:bazel-bin/packages/kbn-ambient-storybook-types",
"@kbn/ambient-ui-types": "link:bazel-bin/packages/kbn-ambient-ui-types",
@ -615,6 +616,7 @@
"@types/json-stable-stringify": "^1.0.32",
"@types/json5": "^0.0.30",
"@types/kbn__ace": "link:bazel-bin/packages/kbn-ace/npm_module_types",
"@types/kbn__aiops-utils": "link:bazel-bin/packages/kbn-aiops-utils/npm_module_types",
"@types/kbn__alerts": "link:bazel-bin/packages/kbn-alerts/npm_module_types",
"@types/kbn__analytics": "link:bazel-bin/packages/kbn-analytics/npm_module_types",
"@types/kbn__analytics-client": "link:bazel-bin/packages/analytics/client/npm_module_types",

View file

@ -17,6 +17,7 @@ filegroup(
"//packages/elastic-apm-synthtrace:build",
"//packages/elastic-safer-lodash-set:build",
"//packages/kbn-ace:build",
"//packages/kbn-aiops-utils:build",
"//packages/kbn-alerts:build",
"//packages/kbn-ambient-storybook-types:build",
"//packages/kbn-ambient-ui-types:build",
@ -133,6 +134,7 @@ filegroup(
"//packages/elastic-apm-synthtrace:build_types",
"//packages/elastic-safer-lodash-set:build_types",
"//packages/kbn-ace:build_types",
"//packages/kbn-aiops-utils:build_types",
"//packages/kbn-alerts:build_types",
"//packages/kbn-analytics:build_types",
"//packages/kbn-apm-config-loader:build_types",

View file

@ -0,0 +1,126 @@
load("@npm//@bazel/typescript:index.bzl", "ts_config")
load("@build_bazel_rules_nodejs//:index.bzl", "js_library")
load("//src/dev/bazel:index.bzl", "jsts_transpiler", "pkg_npm", "pkg_npm_types", "ts_project")
PKG_DIRNAME = "kbn-aiops-utils"
PKG_REQUIRE_NAME = "@kbn/aiops-utils"
SOURCE_FILES = glob(
[
"src/**/*.ts",
"src/**/*.tsx",
],
exclude = [
"**/*.test.*",
],
)
SRCS = SOURCE_FILES
filegroup(
name = "srcs",
srcs = SRCS,
)
NPM_MODULE_EXTRA_FILES = [
"package.json",
]
# In this array place runtime dependencies, including other packages and NPM packages
# which must be available for this code to run.
#
# To reference other packages use:
# "//repo/relative/path/to/package"
# eg. "//packages/kbn-utils"
#
# To reference a NPM package use:
# "@npm//name-of-package"
# eg. "@npm//lodash"
RUNTIME_DEPS = [
"//packages/kbn-logging",
"@npm//react"
]
# In this array place dependencies necessary to build the types, which will include the
# :npm_module_types target of other packages and packages from NPM, including @types/*
# packages.
#
# To reference the types for another package use:
# "//repo/relative/path/to/package:npm_module_types"
# eg. "//packages/kbn-utils:npm_module_types"
#
# References to NPM packages work the same as RUNTIME_DEPS
TYPES_DEPS = [
"//packages/kbn-logging:npm_module_types",
"@npm//@types/node",
"@npm//@types/jest",
"@npm//@types/react"
]
jsts_transpiler(
name = "target_node",
srcs = SRCS,
build_pkg_name = package_name(),
)
jsts_transpiler(
name = "target_web",
srcs = SRCS,
build_pkg_name = package_name(),
web = True,
)
ts_config(
name = "tsconfig",
src = "tsconfig.json",
deps = [
"//:tsconfig.base.json",
"//:tsconfig.bazel.json",
],
)
ts_project(
name = "tsc_types",
args = ['--pretty'],
srcs = SRCS,
deps = TYPES_DEPS,
declaration = True,
emit_declaration_only = True,
out_dir = "target_types",
root_dir = "src",
tsconfig = ":tsconfig",
)
js_library(
name = PKG_DIRNAME,
srcs = NPM_MODULE_EXTRA_FILES,
deps = RUNTIME_DEPS + [":target_node", ":target_web"],
package_name = PKG_REQUIRE_NAME,
visibility = ["//visibility:public"],
)
pkg_npm(
name = "npm_module",
deps = [":" + PKG_DIRNAME],
)
filegroup(
name = "build",
srcs = [":npm_module"],
visibility = ["//visibility:public"],
)
pkg_npm_types(
name = "npm_module_types",
srcs = SRCS,
deps = [":tsc_types"],
package_name = PKG_REQUIRE_NAME,
tsconfig = ":tsconfig",
visibility = ["//visibility:public"],
)
filegroup(
name = "build_types",
srcs = [":npm_module_types"],
visibility = ["//visibility:public"],
)

View file

@ -0,0 +1,3 @@
# @kbn/aiops-utils
The `aiops-utils` package contains static utilities maintained by the ML team for AIOps related efforts.

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
module.exports = {
preset: '@kbn/test',
rootDir: '../..',
roots: ['<rootDir>/packages/kbn-aiops-utils'],
};

View file

@ -0,0 +1,8 @@
{
"name": "@kbn/aiops-utils",
"private": true,
"version": "1.0.0",
"main": "./target_node/index.js",
"browser": "./target_web/index.js",
"license": "SSPL-1.0 OR Elastic License 2.0"
}

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export { streamFactory } from './lib/stream_factory';
export { useFetchStream } from './lib/use_fetch_stream';
export type {
UseFetchStreamCustomReducerParams,
UseFetchStreamParamsDefault,
} from './lib/use_fetch_stream';

View file

@ -1,8 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { acceptCompression } from './accept_compression';

View file

@ -1,11 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { Headers } from '@kbn/core/server';
// TODO: Replace these with kbn packaged versions once we have those available to us.
// At the moment imports from runtime plugins into packages are not supported.
// import type { Headers } from '@kbn/core/server';
type Headers = Record<string, string | string[] | undefined>;
/**
* Returns whether request headers accept a response using gzip compression.

View file

@ -0,0 +1,137 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { ReducerAction } from 'react';
import type { UseFetchStreamParamsDefault } from './use_fetch_stream';
type GeneratorError = string | null;
/**
* Uses `fetch` and `getReader` to receive an API call as a stream with multiple chunks
* as soon as they are available. `fetchStream` is implemented as a generator that will
* yield/emit chunks and can be consumed for example like this:
*
* ```js
* for await (const [error, chunk] of fetchStream(...) {
* ...
* }
* ```
*
* @param endpoint The API endpoint including the Kibana basepath.
* @param abortCtrl Abort controller for cancelling the request.
* @param body The request body. For now all requests are POST.
* @param ndjson Boolean flag to receive the stream as a raw string or NDJSON.
* @param bufferBounce A buffer timeout which defaults to 100ms. This collects stream
* chunks for the time of the timeout and only then yields/emits them.
* This is useful so we are more in control of passing on data to
* consuming React components and we won't hammer the DOM with
* updates on every received chunk.
*
* @returns - Yields/emits items in the format [error, value]
* inspired by node's recommended error convention for callbacks.
*/
export async function* fetchStream<I extends UseFetchStreamParamsDefault, BasePath extends string>(
endpoint: `${BasePath}${I['endpoint']}`,
abortCtrl: React.MutableRefObject<AbortController>,
body: I['body'],
ndjson = true,
bufferBounce = 100
): AsyncGenerator<
[GeneratorError, ReducerAction<I['reducer']> | Array<ReducerAction<I['reducer']>> | undefined]
> {
const stream = await fetch(endpoint, {
signal: abortCtrl.current.signal,
method: 'POST',
headers: {
// This refers to the format of the request body,
// not the response, which will be a uint8array Buffer.
'Content-Type': 'application/json',
'kbn-xsrf': 'stream',
},
...(Object.keys(body).length > 0 ? { body: JSON.stringify(body) } : {}),
});
if (!stream.ok) {
yield [`Error ${stream.status}: ${stream.statusText}`, undefined];
return;
}
if (stream.body !== null) {
// Note that Firefox 99 doesn't support `TextDecoderStream` yet.
// That's why we skip it here and use `TextDecoder` later to decode each chunk.
// Once Firefox supports it, we can use the following alternative:
// const reader = stream.body.pipeThrough(new TextDecoderStream()).getReader();
const reader = stream.body.getReader();
let partial = '';
let actionBuffer: Array<ReducerAction<I['reducer']>> = [];
let lastCall = 0;
while (true) {
try {
const { value: uint8array, done } = await reader.read();
if (done) break;
const value = new TextDecoder().decode(uint8array);
const full = `${partial}${value}`;
const parts = ndjson ? full.split('\n') : [full];
const last = ndjson ? parts.pop() : '';
partial = last ?? '';
const actions = (ndjson ? parts.map((p) => JSON.parse(p)) : parts) as Array<
ReducerAction<I['reducer']>
>;
actionBuffer.push(...actions);
const now = Date.now();
if (now - lastCall >= bufferBounce && actionBuffer.length > 0) {
yield [null, actionBuffer];
actionBuffer = [];
lastCall = now;
// In cases where the next chunk takes longer to be received than the `bufferBounce` timeout,
// we trigger this client side timeout to clear a potential intermediate buffer state.
// Since `yield` cannot be passed on to other scopes like callbacks,
// this pattern using a Promise is used to wait for the timeout.
yield new Promise<
[
GeneratorError,
ReducerAction<I['reducer']> | Array<ReducerAction<I['reducer']>> | undefined
]
>((resolve) => {
setTimeout(() => {
if (actionBuffer.length > 0) {
resolve([null, actionBuffer]);
actionBuffer = [];
lastCall = now;
} else {
resolve([null, []]);
}
}, bufferBounce + 10);
});
}
} catch (error) {
if (error.name !== 'AbortError') {
yield [error.toString(), undefined];
}
break;
}
}
// The stream reader might finish with a partially filled actionBuffer so
// we need to clear it once more after the request is done.
if (actionBuffer.length > 0) {
yield [null, actionBuffer];
actionBuffer.length = 0;
}
}
}

View file

@ -0,0 +1,189 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import zlib from 'zlib';
import { streamFactory } from './stream_factory';
interface MockItem {
type: string;
payload: string[];
}
const mockItem1: MockItem = {
type: 'add_fields',
payload: ['clientip'],
};
const mockItem2: MockItem = {
type: 'add_fields',
payload: ['referer'],
};
describe('streamFactory', () => {
it('should encode and receive an uncompressed string based stream', async () => {
const { end, push, responseWithHeaders } = streamFactory({});
push('push1');
push('push2');
end();
let streamResult = '';
for await (const chunk of responseWithHeaders.body) {
streamResult += chunk.toString('utf8');
}
expect(responseWithHeaders.headers).toBe(undefined);
expect(streamResult).toBe('push1push2');
});
it('should encode and receive an uncompressed NDJSON based stream', async () => {
const { DELIMITER, end, push, responseWithHeaders } = streamFactory<MockItem>({});
push(mockItem1);
push(mockItem2);
end();
let streamResult = '';
for await (const chunk of responseWithHeaders.body) {
streamResult += chunk.toString('utf8');
}
const streamItems = streamResult.split(DELIMITER);
const lastItem = streamItems.pop();
const parsedItems = streamItems.map((d) => JSON.parse(d));
expect(responseWithHeaders.headers).toBe(undefined);
expect(parsedItems).toHaveLength(2);
expect(parsedItems[0]).toStrictEqual(mockItem1);
expect(parsedItems[1]).toStrictEqual(mockItem2);
expect(lastItem).toBe('');
});
// Because zlib.gunzip's API expects a callback, we need to use `done` here
// to indicate once all assertions are run. However, it's not allowed to use both
// `async` and `done` for the test callback. That's why we're using an "async IIFE"
// pattern inside the tests callback to still be able to do async/await for the
// `for await()` part. Note that the unzipping here is done just to be able to
// decode the stream for the test and assert it. When used in actual code,
// the browser on the client side will automatically take care of unzipping
// without the need for additional custom code.
it('should encode and receive a compressed string based stream', (done) => {
(async () => {
const { end, push, responseWithHeaders } = streamFactory({
'accept-encoding': 'gzip',
});
push('push1');
push('push2');
end();
const chunks = [];
for await (const chunk of responseWithHeaders.body) {
chunks.push(chunk);
}
const buffer = Buffer.concat(chunks);
zlib.gunzip(buffer, function (err, decoded) {
expect(err).toBe(null);
const streamResult = decoded.toString('utf8');
expect(responseWithHeaders.headers).toStrictEqual({ 'content-encoding': 'gzip' });
expect(streamResult).toBe('push1push2');
done();
});
})();
});
it('should encode and receive a compressed NDJSON based stream', (done) => {
(async () => {
const { DELIMITER, end, push, responseWithHeaders } = streamFactory<MockItem>({
'accept-encoding': 'gzip',
});
push(mockItem1);
push(mockItem2);
end();
const chunks = [];
for await (const chunk of responseWithHeaders.body) {
chunks.push(chunk);
}
const buffer = Buffer.concat(chunks);
zlib.gunzip(buffer, function (err, decoded) {
expect(err).toBe(null);
const streamResult = decoded.toString('utf8');
const streamItems = streamResult.split(DELIMITER);
const lastItem = streamItems.pop();
const parsedItems = streamItems.map((d) => JSON.parse(d));
expect(responseWithHeaders.headers).toStrictEqual({ 'content-encoding': 'gzip' });
expect(parsedItems).toHaveLength(2);
expect(parsedItems[0]).toStrictEqual(mockItem1);
expect(parsedItems[1]).toStrictEqual(mockItem2);
expect(lastItem).toBe('');
done();
});
})();
});
it('should throw when a string based stream receives a non-string chunk', async () => {
const { push } = streamFactory({});
// First push initializes the stream as string based.
expect(() => {
push('push1');
}).not.toThrow();
// Second push is again a string and should not throw.
expect(() => {
push('push2');
}).not.toThrow();
// Third push is not a string and should trigger an error.
expect(() => {
push({ myObject: 'push3' } as unknown as string);
}).toThrow('Must not push non-string chunks to a string based stream.');
});
it('should throw when an NDJSON based stream receives a string chunk', async () => {
const { push } = streamFactory<MockItem>({});
// First push initializes the stream as NDJSON based.
expect(() => {
push(mockItem1);
}).not.toThrow();
// Second push is again a valid object and should not throw.
expect(() => {
push(mockItem1);
}).not.toThrow();
// Third push is a string and should trigger an error.
expect(() => {
push('push3' as unknown as MockItem);
}).toThrow('Must not push raw string chunks to an NDJSON based stream.');
});
it('should throw for undefined as push value', async () => {
const { push } = streamFactory({});
expect(() => {
push(undefined as unknown as string);
}).toThrow('Stream chunk must not be undefined.');
});
});

View file

@ -0,0 +1,115 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Stream } from 'stream';
import zlib from 'zlib';
// TODO: Replace these with kbn packaged versions once we have those available to us.
// At the moment imports from runtime plugins into packages are not supported.
// import type { Headers } from '@kbn/core/server';
import { acceptCompression } from './accept_compression';
type Headers = Record<string, string | string[] | undefined>;
// We need this otherwise Kibana server will crash with a 'ERR_METHOD_NOT_IMPLEMENTED' error.
class ResponseStream extends Stream.PassThrough {
flush() {}
_read() {}
}
const DELIMITER = '\n';
type StreamType = 'string' | 'ndjson';
interface StreamFactoryReturnType<T = unknown> {
DELIMITER: string;
end: () => void;
error: (errorText: string) => void;
push: (d: T) => void;
responseWithHeaders: {
body: zlib.Gzip | ResponseStream;
// TODO: Replace these with kbn packaged versions once we have those available to us.
// At the moment imports from runtime plugins into packages are not supported.
headers?: any;
};
}
/**
* Overload to set up a string based response stream with support
* for gzip compression depending on provided request headers.
*
* @param headers - Request headers.
* @returns An object with stream attributes and methods.
*/
export function streamFactory<T = string>(headers: Headers): StreamFactoryReturnType<T>;
/**
* Sets up a response stream with support for gzip compression depending on provided
* request headers. Any non-string data pushed to the stream will be stream as NDJSON.
*
* @param headers - Request headers.
* @returns An object with stream attributes and methods.
*/
export function streamFactory<T = unknown>(headers: Headers): StreamFactoryReturnType<T> {
let streamType: StreamType;
const isCompressed = acceptCompression(headers);
const stream = isCompressed ? zlib.createGzip() : new ResponseStream();
function error(errorText: string) {
stream.emit('error', errorText);
}
function end() {
stream.end();
}
function push(d: T) {
if (d === undefined) {
error('Stream chunk must not be undefined.');
return;
}
// Initialize the stream type with the first push to the stream,
// otherwise check the integrity of the data to be pushed.
if (streamType === undefined) {
streamType = typeof d === 'string' ? 'string' : 'ndjson';
} else if (streamType === 'string' && typeof d !== 'string') {
error('Must not push non-string chunks to a string based stream.');
return;
} else if (streamType === 'ndjson' && typeof d === 'string') {
error('Must not push raw string chunks to an NDJSON based stream.');
return;
}
try {
const line = typeof d !== 'string' ? `${JSON.stringify(d)}${DELIMITER}` : d;
stream.write(line);
} catch (e) {
error(`Could not serialize or stream data chunk: ${e.toString()}`);
}
// Calling .flush() on a compression stream will
// make zlib return as much output as currently possible.
if (isCompressed) {
stream.flush();
}
}
const responseWithHeaders: StreamFactoryReturnType['responseWithHeaders'] = {
body: stream,
...(isCompressed
? {
headers: {
'content-encoding': 'gzip',
},
}
: {}),
};
return { DELIMITER, end, error, push, responseWithHeaders };
}

View file

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Reducer, ReducerAction, ReducerState } from 'react';
type StringReducerPayload = string | string[] | undefined;
export type StringReducer = Reducer<string, StringReducerPayload>;
/**
* The `stringReducer` is provided to handle plain string based streams with `streamFactory()`.
*
* @param state - The current state, being the string fetched so far.
* @param payload The state update can be a plain string, an array of strings or `undefined`.
* * An array of strings will be joined without a delimiter and added to the current string.
* In combination with `useFetchStream`'s buffering this allows to do bulk updates
* within the reducer without triggering a React/DOM update on every stream chunk.
* * `undefined` can be used to reset the state to an empty string, for example, when a
* UI has the option to trigger a refetch of a stream.
*
* @returns The updated state, a string that combines the previous string and the payload.
*/
export function stringReducer(
state: ReducerState<StringReducer>,
payload: ReducerAction<StringReducer>
): ReducerState<StringReducer> {
if (payload === undefined) {
return '';
}
return `${state}${Array.isArray(payload) ? payload.join('') : payload}`;
}

View file

@ -0,0 +1,137 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import {
useEffect,
useReducer,
useRef,
useState,
Dispatch,
Reducer,
ReducerAction,
ReducerState,
} from 'react';
import { fetchStream } from './fetch_stream';
import { stringReducer, StringReducer } from './string_reducer';
/**
* Custom hook type definition of the base params for an NDJSON stream with custom reducer.
*/
export interface UseFetchStreamCustomReducerParams {
endpoint: string;
body: object;
reducer: Reducer<any, any>;
}
/**
* Custom hook type definition of the base params for a string base stream without a custom reducer.
*/
export interface UseFetchStreamParamsDefault {
endpoint: string;
body: object;
reducer: StringReducer;
}
interface UseFetchStreamReturnType<Data, Action> {
cancel: () => void;
data: Data;
dispatch: Dispatch<Action>;
error: string | undefined;
isCancelled: boolean;
isRunning: boolean;
start: () => Promise<void>;
}
// These overloads allow us to fall back to a simple reducer that just acts on a string as the reducer state
// if no options are supplied. Passing in options will use a custom reducer with appropriate type support.
export function useFetchStream<I extends UseFetchStreamParamsDefault, BasePath extends string>(
endpoint: `${BasePath}${I['endpoint']}`,
body: I['body']
): UseFetchStreamReturnType<string, ReducerAction<I['reducer']>>;
export function useFetchStream<
I extends UseFetchStreamCustomReducerParams,
BasePath extends string
>(
endpoint: `${BasePath}${I['endpoint']}`,
body: I['body'],
options: { reducer: I['reducer']; initialState: ReducerState<I['reducer']> }
): UseFetchStreamReturnType<ReducerState<I['reducer']>, ReducerAction<I['reducer']>>;
/**
* Custom hook to receive streaming data.
*
* @param endpoint - API endpoint including Kibana base path.
* @param body - API request body.
* @param options - Optional custom reducer and initial state.
* @returns An object with streaming data and methods act on the stream.
*/
export function useFetchStream<I extends UseFetchStreamParamsDefault, BasePath extends string>(
endpoint: `${BasePath}${I['endpoint']}`,
body: I['body'],
options?: { reducer: I['reducer']; initialState: ReducerState<I['reducer']> }
): UseFetchStreamReturnType<ReducerState<I['reducer']>, ReducerAction<I['reducer']>> {
const [error, setError] = useState<string | undefined>();
const [isCancelled, setIsCancelled] = useState(false);
const [isRunning, setIsRunning] = useState(false);
const reducer = (options?.reducer ?? stringReducer) as I['reducer'];
const initialState = (options?.initialState ?? '') as ReducerState<I['reducer']>;
const [data, dispatch] = useReducer(reducer, initialState);
const abortCtrl = useRef(new AbortController());
const start = async () => {
if (isRunning) {
setError('Restart not supported yet.');
return;
}
setError(undefined);
setIsRunning(true);
setIsCancelled(false);
abortCtrl.current = new AbortController();
for await (const [fetchStreamError, actions] of fetchStream<
UseFetchStreamCustomReducerParams,
BasePath
>(endpoint, abortCtrl, body, options !== undefined)) {
if (fetchStreamError !== null) {
setError(fetchStreamError);
} else if (actions.length > 0) {
dispatch(actions as ReducerAction<I['reducer']>);
}
}
setIsRunning(false);
};
const cancel = () => {
abortCtrl.current.abort();
setIsCancelled(true);
setIsRunning(false);
};
// If components using this custom hook get unmounted, cancel any ongoing request.
useEffect(() => {
return () => abortCtrl.current.abort();
}, []);
return {
cancel,
data,
dispatch,
error,
isCancelled,
isRunning,
start,
};
}

View file

@ -0,0 +1,18 @@
{
"extends": "../../tsconfig.bazel.json",
"compilerOptions": {
"declaration": true,
"emitDeclarationOnly": true,
"outDir": "target_types",
"rootDir": "src",
"stripInternal": false,
"types": [
"jest",
"node",
"react"
]
},
"include": [
"src/**/*"
]
}

View file

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { FtrProviderContext } from '../../functional/ftr_provider_context';
// eslint-disable-next-line import/no-default-export
export default function ({ getService, getPageObjects, loadTestFile }: FtrProviderContext) {
describe('response stream', function () {
loadTestFile(require.resolve('./reducer_stream'));
});
}

View file

@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export async function* parseStream(stream: NodeJS.ReadableStream) {
let partial = '';
try {
for await (const value of stream) {
const full = `${partial}${value}`;
const parts = full.split('\n');
const last = parts.pop();
partial = last ?? '';
const actions = parts.map((p) => JSON.parse(p));
for (const action of actions) {
yield action;
}
}
} catch (error) {
yield { type: 'error', payload: error.toString() };
}
}

View file

@ -1,8 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import fetch from 'node-fetch';
@ -10,19 +11,20 @@ import { format as formatUrl } from 'url';
import expect from '@kbn/expect';
import { FtrProviderContext } from '../../ftr_provider_context';
import { FtrProviderContext } from '../../functional/ftr_provider_context';
import { parseStream } from './parse_stream';
// eslint-disable-next-line import/no-default-export
export default ({ getService }: FtrProviderContext) => {
const supertest = getService('supertest');
const config = getService('config');
const kibanaServerUrl = formatUrl(config.get('servers.kibana'));
describe('POST /internal/aiops/example_stream', () => {
describe('POST /internal/response_stream/reducer_stream', () => {
it('should return full data without streaming', async () => {
const resp = await supertest
.post(`/internal/aiops/example_stream`)
.post('/internal/response_stream/reducer_stream')
.set('kbn-xsrf', 'kibana')
.send({
timeout: 1,
@ -55,7 +57,7 @@ export default ({ getService }: FtrProviderContext) => {
});
it('should return data in chunks with streaming', async () => {
const response = await fetch(`${kibanaServerUrl}/internal/aiops/example_stream`, {
const response = await fetch(`${kibanaServerUrl}/internal/response_stream/reducer_stream`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',

View file

@ -31,6 +31,8 @@
"@kbn/paertial-results-example-plugin/*": ["examples/partial_results_example/*"],
"@kbn/preboot-example-plugin": ["examples/preboot_example"],
"@kbn/preboot-example-plugin/*": ["examples/preboot_example/*"],
"@kbn/response-stream-plugin": ["examples/response_stream"],
"@kbn/response-stream-plugin/*": ["examples/response_stream/*"],
"@kbn/routing-example-plugin": ["examples/routing_example"],
"@kbn/routing-example-plugin/*": ["examples/routing_example/*"],
"@kbn/screenshot-mode-example-plugin": ["examples/screenshot_mode_example"],

View file

@ -9,20 +9,15 @@ import type {
AiopsExplainLogRateSpikesSchema,
AiopsExplainLogRateSpikesApiAction,
} from './explain_log_rate_spikes';
import type { AiopsExampleStreamSchema, AiopsExampleStreamApiAction } from './example_stream';
import { streamReducer } from './stream_reducer';
export const API_ENDPOINT = {
EXAMPLE_STREAM: '/internal/aiops/example_stream',
EXPLAIN_LOG_RATE_SPIKES: '/internal/aiops/explain_log_rate_spikes',
} as const;
export type ApiEndpoint = typeof API_ENDPOINT[keyof typeof API_ENDPOINT];
export interface ApiEndpointOptions {
[API_ENDPOINT.EXAMPLE_STREAM]: AiopsExampleStreamSchema;
[API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES]: AiopsExplainLogRateSpikesSchema;
}
export interface ApiEndpointActions {
[API_ENDPOINT.EXAMPLE_STREAM]: AiopsExampleStreamApiAction;
[API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES]: AiopsExplainLogRateSpikesApiAction;
export interface ApiExplainLogRateSpikes {
endpoint: typeof API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES;
reducer: typeof streamReducer;
body: AiopsExplainLogRateSpikesSchema;
actions: AiopsExplainLogRateSpikesApiAction;
}

View file

@ -5,10 +5,7 @@
* 2.0.
*/
import {
API_ACTION_NAME,
AiopsExplainLogRateSpikesApiAction,
} from '../../../common/api/explain_log_rate_spikes';
import { API_ACTION_NAME, AiopsExplainLogRateSpikesApiAction } from './explain_log_rate_spikes';
interface StreamState {
fields: string[];

View file

@ -10,10 +10,11 @@ import React, { useEffect, FC } from 'react';
import { EuiBadge, EuiSpacer, EuiText } from '@elastic/eui';
import type { DataView } from '@kbn/data-views-plugin/public';
import { useFetchStream } from '@kbn/aiops-utils';
import { useKibana } from '@kbn/kibana-react-plugin/public';
import { useStreamFetchReducer } from '../../hooks/use_stream_fetch_reducer';
import { initialState, streamReducer } from './stream_reducer';
import { initialState, streamReducer } from '../../../common/api/stream_reducer';
import type { ApiExplainLogRateSpikes } from '../../../common/api';
/**
* ExplainLogRateSpikes props require a data view.
@ -24,11 +25,13 @@ export interface ExplainLogRateSpikesProps {
}
export const ExplainLogRateSpikes: FC<ExplainLogRateSpikesProps> = ({ dataView }) => {
const { start, data, isRunning } = useStreamFetchReducer(
'/internal/aiops/explain_log_rate_spikes',
streamReducer,
initialState,
{ index: dataView.title }
const kibana = useKibana();
const basePath = kibana.services.http?.basePath.get() ?? '';
const { start, data, isRunning } = useFetchStream<ApiExplainLogRateSpikes, typeof basePath>(
`${basePath}/internal/aiops/explain_log_rate_spikes`,
{ index: dataView.title },
{ reducer: streamReducer, initialState }
);
useEffect(() => {

View file

@ -1,12 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SingleEndpointStreamingDemo } from './single_endpoint_streaming_demo';
// required for dynamic import using React.lazy()
// eslint-disable-next-line import/no-default-export
export default SingleEndpointStreamingDemo;

View file

@ -1,135 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { useEffect, useState, FC } from 'react';
import { Chart, Settings, Axis, BarSeries, Position, ScaleType } from '@elastic/charts';
import { i18n } from '@kbn/i18n';
import { useKibana } from '@kbn/kibana-react-plugin/public';
import {
EuiBadge,
EuiButton,
EuiCheckbox,
EuiFlexGroup,
EuiFlexItem,
EuiProgress,
EuiSpacer,
EuiText,
} from '@elastic/eui';
import { useStreamFetchReducer } from '../../hooks/use_stream_fetch_reducer';
import { getStatusMessage } from './get_status_message';
import { initialState, resetStream, streamReducer } from './stream_reducer';
export const SingleEndpointStreamingDemo: FC = () => {
const { notifications } = useKibana();
const [simulateErrors, setSimulateErrors] = useState(false);
const { dispatch, start, cancel, data, isCancelled, isRunning } = useStreamFetchReducer(
'/internal/aiops/example_stream',
streamReducer,
initialState,
{ simulateErrors }
);
const { errors, progress, entities } = data;
const onClickHandler = async () => {
if (isRunning) {
cancel();
} else {
dispatch(resetStream());
start();
}
};
useEffect(() => {
if (errors.length > 0) {
notifications.toasts.danger({ body: errors[errors.length - 1] });
}
}, [errors, notifications.toasts]);
const buttonLabel = isRunning
? i18n.translate('xpack.aiops.stopbuttonText', {
defaultMessage: 'Stop development',
})
: i18n.translate('xpack.aiops.startbuttonText', {
defaultMessage: 'Start development',
});
return (
<EuiText>
<EuiFlexGroup alignItems="center">
<EuiFlexItem grow={false}>
<EuiButton type="primary" size="s" onClick={onClickHandler} aria-label={buttonLabel}>
{buttonLabel}
</EuiButton>
</EuiFlexItem>
<EuiFlexItem grow={false}>
<EuiText>
<EuiBadge>{progress}%</EuiBadge>
</EuiText>
</EuiFlexItem>
<EuiFlexItem>
<EuiProgress value={progress} max={100} size="xs" />
</EuiFlexItem>
</EuiFlexGroup>
<EuiSpacer />
<div style={{ height: '300px' }}>
<Chart>
<Settings rotation={90} />
<Axis
id="entities"
position={Position.Bottom}
title={i18n.translate('xpack.aiops.barChart.commitsTitle', {
defaultMessage: 'Commits',
})}
showOverlappingTicks
/>
<Axis
id="left2"
title={i18n.translate('xpack.aiops.barChart.developersTitle', {
defaultMessage: 'Developers',
})}
position={Position.Left}
/>
<BarSeries
id="commits"
xScaleType={ScaleType.Linear}
yScaleType={ScaleType.Linear}
xAccessor="x"
yAccessors={['y']}
data={Object.entries(entities)
.map(([x, y]) => {
return {
x,
y,
};
})
.sort((a, b) => b.y - a.y)}
/>
</Chart>
</div>
<p>{getStatusMessage(isRunning, isCancelled, data.progress)}</p>
<EuiCheckbox
id="aiopSimulateErrorsCheckbox"
label={i18n.translate('xpack.aiops.explainLogRateSpikes.simulateErrorsCheckboxLabel', {
defaultMessage:
'Simulate errors (gets applied to new streams only, not currently running ones).',
})}
checked={simulateErrors}
onChange={(e) => setSimulateErrors(!simulateErrors)}
compressed
/>
</EuiText>
);
};

View file

@ -1,101 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type React from 'react';
import type { ApiEndpoint, ApiEndpointActions, ApiEndpointOptions } from '../../common/api';
interface ErrorAction {
type: 'error';
payload: string;
}
export async function* streamFetch<E extends ApiEndpoint>(
endpoint: E,
abortCtrl: React.MutableRefObject<AbortController>,
options: ApiEndpointOptions[E],
basePath = ''
): AsyncGenerator<Array<ApiEndpointActions[E] | ErrorAction>> {
const stream = await fetch(`${basePath}${endpoint}`, {
signal: abortCtrl.current.signal,
method: 'POST',
headers: {
// This refers to the format of the request body,
// not the response, which will be a uint8array Buffer.
'Content-Type': 'application/json',
'kbn-xsrf': 'stream',
},
body: JSON.stringify(options),
});
if (stream.body !== null) {
// Note that Firefox 99 doesn't support `TextDecoderStream` yet.
// That's why we skip it here and use `TextDecoder` later to decode each chunk.
// Once Firefox supports it, we can use the following alternative:
// const reader = stream.body.pipeThrough(new TextDecoderStream()).getReader();
const reader = stream.body.getReader();
const bufferBounce = 100;
let partial = '';
let actionBuffer: Array<ApiEndpointActions[E]> = [];
let lastCall = 0;
while (true) {
try {
const { value: uint8array, done } = await reader.read();
if (done) break;
const value = new TextDecoder().decode(uint8array);
const full = `${partial}${value}`;
const parts = full.split('\n');
const last = parts.pop();
partial = last ?? '';
const actions = parts.map((p) => JSON.parse(p)) as Array<ApiEndpointActions[E]>;
actionBuffer.push(...actions);
const now = Date.now();
if (now - lastCall >= bufferBounce && actionBuffer.length > 0) {
yield actionBuffer;
actionBuffer = [];
lastCall = now;
// In cases where the next chunk takes longer to be received than the `bufferBounce` timeout,
// we trigger this client side timeout to clear a potential intermediate buffer state.
// Since `yield` cannot be passed on to other scopes like callbacks,
// this pattern using a Promise is used to wait for the timeout.
yield new Promise<Array<ApiEndpointActions[E]>>((resolve) => {
setTimeout(() => {
if (actionBuffer.length > 0) {
resolve(actionBuffer);
actionBuffer = [];
lastCall = now;
} else {
resolve([]);
}
}, bufferBounce + 10);
});
}
} catch (error) {
if (error.name !== 'AbortError') {
yield [{ type: 'error', payload: error.toString() }];
}
break;
}
}
// The reader might finish with a partially filled actionBuffer so
// we need to clear it once more after the request is done.
if (actionBuffer.length > 0) {
yield actionBuffer;
actionBuffer.length = 0;
}
}
}

View file

@ -1,82 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
useEffect,
useReducer,
useRef,
useState,
Reducer,
ReducerAction,
ReducerState,
} from 'react';
import { useKibana } from '@kbn/kibana-react-plugin/public';
import type { ApiEndpoint, ApiEndpointOptions } from '../../common/api';
import { streamFetch } from './stream_fetch';
export const useStreamFetchReducer = <R extends Reducer<any, any>, E extends ApiEndpoint>(
endpoint: E,
reducer: R,
initialState: ReducerState<R>,
options: ApiEndpointOptions[E]
) => {
const kibana = useKibana();
const [isCancelled, setIsCancelled] = useState(false);
const [isRunning, setIsRunning] = useState(false);
const [data, dispatch] = useReducer(reducer, initialState);
const abortCtrl = useRef(new AbortController());
const start = async () => {
if (isRunning) {
throw new Error('Restart not supported yet');
}
setIsRunning(true);
setIsCancelled(false);
abortCtrl.current = new AbortController();
for await (const actions of streamFetch(
endpoint,
abortCtrl,
options,
kibana.services.http?.basePath.get()
)) {
if (actions.length > 0) {
dispatch(actions as ReducerAction<R>);
}
}
setIsRunning(false);
};
const cancel = () => {
abortCtrl.current.abort();
setIsCancelled(true);
setIsRunning(false);
};
// If components using this custom hook get unmounted, cancel any ongoing request.
useEffect(() => {
return () => abortCtrl.current.abort();
}, []);
return {
cancel,
data,
dispatch,
isCancelled,
isRunning,
start,
};
};

View file

@ -14,5 +14,5 @@ export function plugin() {
}
export type { ExplainLogRateSpikesProps } from './components/explain_log_rate_spikes';
export { ExplainLogRateSpikes, SingleEndpointStreamingDemo } from './shared_lazy_components';
export { ExplainLogRateSpikes } from './shared_lazy_components';
export type { AiopsPluginSetup, AiopsPluginStart } from './types';

View file

@ -12,9 +12,6 @@ import { EuiErrorBoundary, EuiLoadingContent } from '@elastic/eui';
import type { ExplainLogRateSpikesProps } from './components/explain_log_rate_spikes';
const ExplainLogRateSpikesLazy = React.lazy(() => import('./components/explain_log_rate_spikes'));
const SingleEndpointStreamingDemoLazy = React.lazy(
() => import('./components/single_endpoint_streaming_demo')
);
const LazyWrapper: FC = ({ children }) => (
<EuiErrorBoundary>
@ -31,12 +28,3 @@ export const ExplainLogRateSpikes: FC<ExplainLogRateSpikesProps> = (props) => (
<ExplainLogRateSpikesLazy {...props} />
</LazyWrapper>
);
/**
* Lazy-wrapped SingleEndpointStreamingDemo React component
*/
export const SingleEndpointStreamingDemo: FC = () => (
<LazyWrapper>
<SingleEndpointStreamingDemoLazy />
</LazyWrapper>
);

View file

@ -1,106 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import zlib from 'zlib';
import { loggerMock, MockedLogger } from '@kbn/logging-mocks';
import { API_ENDPOINT } from '../../common/api';
import type { ApiEndpointActions } from '../../common/api';
import { streamFactory } from './stream_factory';
type Action = ApiEndpointActions['/internal/aiops/explain_log_rate_spikes'];
const mockItem1: Action = {
type: 'add_fields',
payload: ['clientip'],
};
const mockItem2: Action = {
type: 'add_fields',
payload: ['referer'],
};
describe('streamFactory', () => {
let mockLogger: MockedLogger;
beforeEach(() => {
mockLogger = loggerMock.create();
});
it('should encode and receive an uncompressed stream', async () => {
const { DELIMITER, end, push, responseWithHeaders, stream } = streamFactory<
typeof API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES
>(mockLogger, {});
push(mockItem1);
push(mockItem2);
end();
let streamResult = '';
for await (const chunk of stream) {
streamResult += chunk.toString('utf8');
}
const streamItems = streamResult.split(DELIMITER);
const lastItem = streamItems.pop();
const parsedItems = streamItems.map((d) => JSON.parse(d));
expect(responseWithHeaders.headers).toBe(undefined);
expect(parsedItems).toHaveLength(2);
expect(parsedItems[0]).toStrictEqual(mockItem1);
expect(parsedItems[1]).toStrictEqual(mockItem2);
expect(lastItem).toBe('');
});
// Because zlib.gunzip's API expects a callback, we need to use `done` here
// to indicate once all assertions are run. However, it's not allowed to use both
// `async` and `done` for the test callback. That's why we're using an "async IIFE"
// pattern inside the tests callback to still be able to do async/await for the
// `for await()` part. Note that the unzipping here is done just to be able to
// decode the stream for the test and assert it. When used in actual code,
// the browser on the client side will automatically take care of unzipping
// without the need for additional custom code.
it('should encode and receive a compressed stream', (done) => {
(async () => {
const { DELIMITER, end, push, responseWithHeaders, stream } = streamFactory<
typeof API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES
>(mockLogger, { 'accept-encoding': 'gzip' });
push(mockItem1);
push(mockItem2);
end();
const chunks = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
const buffer = Buffer.concat(chunks);
zlib.gunzip(buffer, function (err, decoded) {
expect(err).toBe(null);
const streamResult = decoded.toString('utf8');
const streamItems = streamResult.split(DELIMITER);
const lastItem = streamItems.pop();
const parsedItems = streamItems.map((d) => JSON.parse(d));
expect(responseWithHeaders.headers).toStrictEqual({ 'content-encoding': 'gzip' });
expect(parsedItems).toHaveLength(2);
expect(parsedItems[0]).toStrictEqual(mockItem1);
expect(parsedItems[1]).toStrictEqual(mockItem2);
expect(lastItem).toBe('');
done();
});
})();
});
});

View file

@ -1,70 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Stream } from 'stream';
import zlib from 'zlib';
import type { Headers, Logger } from '@kbn/core/server';
import { ApiEndpoint, ApiEndpointActions } from '../../common/api';
import { acceptCompression } from './accept_compression';
// We need this otherwise Kibana server will crash with a 'ERR_METHOD_NOT_IMPLEMENTED' error.
class ResponseStream extends Stream.PassThrough {
flush() {}
_read() {}
}
const DELIMITER = '\n';
/**
* Sets up a response stream with support for gzip compression depending on provided
* request headers.
*
* @param logger - Kibana provided logger.
* @param headers - Request headers.
* @returns An object with stream attributes and methods.
*/
export function streamFactory<T extends ApiEndpoint>(logger: Logger, headers: Headers) {
const isCompressed = acceptCompression(headers);
const stream = isCompressed ? zlib.createGzip() : new ResponseStream();
function push(d: ApiEndpointActions[T]) {
try {
const line = JSON.stringify(d);
stream.write(`${line}${DELIMITER}`);
// Calling .flush() on a compression stream will
// make zlib return as much output as currently possible.
if (isCompressed) {
stream.flush();
}
} catch (error) {
logger.error('Could not serialize or stream a message.');
logger.error(error);
}
}
function end() {
stream.end();
}
const responseWithHeaders = {
body: stream,
...(isCompressed
? {
headers: {
'content-encoding': 'gzip',
},
}
: {}),
};
return { DELIMITER, end, push, responseWithHeaders, stream };
}

View file

@ -16,7 +16,7 @@ import {
AiopsPluginSetupDeps,
AiopsPluginStartDeps,
} from './types';
import { defineExampleStreamRoute, defineExplainLogRateSpikesRoute } from './routes';
import { defineExplainLogRateSpikesRoute } from './routes';
export class AiopsPlugin
implements Plugin<AiopsPluginSetup, AiopsPluginStart, AiopsPluginSetupDeps, AiopsPluginStartDeps>
@ -34,7 +34,6 @@ export class AiopsPlugin
// Register server side APIs
if (AIOPS_ENABLED) {
core.getStartServices().then(([_, depsStart]) => {
defineExampleStreamRoute(router, this.logger);
defineExplainLogRateSpikesRoute(router, this.logger);
});
}

View file

@ -9,15 +9,15 @@ import { firstValueFrom } from 'rxjs';
import type { IRouter, Logger } from '@kbn/core/server';
import type { DataRequestHandlerContext, IEsSearchRequest } from '@kbn/data-plugin/server';
import { streamFactory } from '@kbn/aiops-utils';
import {
aiopsExplainLogRateSpikesSchema,
addFieldsAction,
AiopsExplainLogRateSpikesApiAction,
} from '../../common/api/explain_log_rate_spikes';
import { API_ENDPOINT } from '../../common/api';
import { streamFactory } from '../lib/stream_factory';
export const defineExplainLogRateSpikesRoute = (
router: IRouter<DataRequestHandlerContext>,
logger: Logger
@ -60,9 +60,9 @@ export const defineExplainLogRateSpikesRoute = (
const doc = res.rawResponse.hits.hits.pop();
const fields = Object.keys(doc?._source ?? {});
const { end, push, responseWithHeaders } = streamFactory<
typeof API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES
>(logger, request.headers);
const { end, push, responseWithHeaders } = streamFactory<AiopsExplainLogRateSpikesApiAction>(
request.headers
);
async function pushField() {
setTimeout(() => {
@ -79,7 +79,9 @@ export const defineExplainLogRateSpikesRoute = (
} else {
end();
}
}, Math.random() * 1000);
// This is just exemplary demo code so we're adding a random timout of 0-250ms to each
// stream push to simulate string chunks appearing on the client with some randomness.
}, Math.random() * 250);
}
pushField();

View file

@ -5,5 +5,4 @@
* 2.0.
*/
export { defineExampleStreamRoute } from './example_stream';
export { defineExplainLogRateSpikesRoute } from './explain_log_rate_spikes';

View file

@ -55,7 +55,6 @@ export const ML_PAGES = {
AIOPS: 'aiops',
AIOPS_EXPLAIN_LOG_RATE_SPIKES: 'aiops/explain_log_rate_spikes',
AIOPS_EXPLAIN_LOG_RATE_SPIKES_INDEX_SELECT: 'aiops/explain_log_rate_spikes_index_select',
AIOPS_SINGLE_ENDPOINT_STREAMING_DEMO: 'aiops/single_endpoint_streaming_demo',
} as const;
export type MlPages = typeof ML_PAGES[keyof typeof ML_PAGES];

View file

@ -64,8 +64,7 @@ export type MlGenericUrlState = MLPageState<
| typeof ML_PAGES.DATA_VISUALIZER_INDEX_SELECT
| typeof ML_PAGES.AIOPS
| typeof ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES
| typeof ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES_INDEX_SELECT
| typeof ML_PAGES.AIOPS_SINGLE_ENDPOINT_STREAMING_DEMO,
| typeof ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES_INDEX_SELECT,
MlGenericUrlPageState | undefined
>;

View file

@ -1,34 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { FC } from 'react';
import { FormattedMessage } from '@kbn/i18n-react';
import { SingleEndpointStreamingDemo } from '@kbn/aiops-plugin/public';
import { useMlKibana, useTimefilter } from '../contexts/kibana';
import { HelpMenu } from '../components/help_menu';
import { MlPageHeader } from '../components/page_header';
export const SingleEndpointStreamingDemoPage: FC = () => {
useTimefilter({ timeRangeSelector: false, autoRefreshSelector: false });
const {
services: { docLinks },
} = useMlKibana();
return (
<>
<MlPageHeader>
<FormattedMessage
id="xpack.ml.singleEndpointStreamingDemo.pageHeader"
defaultMessage="Single endpoint streaming demo"
/>
</MlPageHeader>
<SingleEndpointStreamingDemo />
<HelpMenu docLink={docLinks.links.ml.guide} />
</>
);
};

View file

@ -236,15 +236,6 @@ export function useSideNavItems(activeRoute: MlRoute | undefined) {
disabled: disableLinks,
testSubj: 'mlMainTab explainLogRateSpikes',
},
{
id: 'singleEndpointStreamingDemo',
pathId: ML_PAGES.AIOPS_SINGLE_ENDPOINT_STREAMING_DEMO,
name: i18n.translate('xpack.ml.navMenu.singleEndpointStreamingDemoLinkText', {
defaultMessage: 'Single endpoint streaming demo',
}),
disabled: disableLinks,
testSubj: 'mlMainTab singleEndpointStreamingDemo',
},
],
});
}

View file

@ -6,4 +6,3 @@
*/
export * from './explain_log_rate_spikes';
export * from './single_endpoint_streaming_demo';

View file

@ -1,63 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { FC } from 'react';
import { parse } from 'query-string';
import { i18n } from '@kbn/i18n';
import { AIOPS_ENABLED } from '@kbn/aiops-plugin/common';
import { NavigateToPath } from '../../../contexts/kibana';
import { MlRoute, PageLoader, PageProps } from '../../router';
import { useResolver } from '../../use_resolver';
import { SingleEndpointStreamingDemoPage as Page } from '../../../aiops/single_endpoint_streaming_demo';
import { checkBasicLicense } from '../../../license';
import { checkGetJobsCapabilitiesResolver } from '../../../capabilities/check_capabilities';
import { cacheDataViewsContract } from '../../../util/index_utils';
import { getBreadcrumbWithUrlForApp } from '../../breadcrumbs';
export const singleEndpointStreamingDemoRouteFactory = (
navigateToPath: NavigateToPath,
basePath: string
): MlRoute => ({
id: 'single_endpoint_streaming_demo',
path: '/aiops/single_endpoint_streaming_demo',
title: i18n.translate('xpack.ml.aiops.singleEndpointStreamingDemo.docTitle', {
defaultMessage: 'Single endpoint streaming demo',
}),
render: (props, deps) => <PageWrapper {...props} deps={deps} />,
breadcrumbs: [
getBreadcrumbWithUrlForApp('ML_BREADCRUMB', navigateToPath, basePath),
getBreadcrumbWithUrlForApp('AIOPS_BREADCRUMB', navigateToPath, basePath),
{
text: i18n.translate('xpack.ml.aiopsBreadcrumbs.singleEndpointStreamingDemoLabel', {
defaultMessage: 'Single endpoint streaming demo',
}),
},
],
disabled: !AIOPS_ENABLED,
});
const PageWrapper: FC<PageProps> = ({ location, deps }) => {
const { redirectToMlAccessDeniedPage } = deps;
const { index, savedSearchId }: Record<string, any> = parse(location.search, { sort: false });
const { context } = useResolver(index, savedSearchId, deps.config, deps.dataViewsContract, {
checkBasicLicense,
cacheDataViewsContract: () => cacheDataViewsContract(deps.dataViewsContract),
checkGetJobsCapabilities: () => checkGetJobsCapabilitiesResolver(redirectToMlAccessDeniedPage),
});
return (
<PageLoader context={context}>
<Page />
</PageLoader>
);
};

View file

@ -87,7 +87,6 @@ export class MlLocatorDefinition implements LocatorDefinition<MlLocatorParams> {
case ML_PAGES.AIOPS:
case ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES:
case ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES_INDEX_SELECT:
case ML_PAGES.AIOPS_SINGLE_ENDPOINT_STREAMING_DEMO:
case ML_PAGES.OVERVIEW:
case ML_PAGES.SETTINGS:
case ML_PAGES.FILTER_LISTS_MANAGE:

View file

@ -14,7 +14,6 @@ export default function ({ loadTestFile }: FtrProviderContext) {
this.tags(['ml']);
if (AIOPS_ENABLED) {
loadTestFile(require.resolve('./example_stream'));
loadTestFile(require.resolve('./explain_log_rate_spikes'));
}
});

View file

@ -2892,6 +2892,10 @@
version "0.0.0"
uid ""
"@kbn/aiops-utils@link:bazel-bin/packages/kbn-aiops-utils":
version "0.0.0"
uid ""
"@kbn/alerts@link:bazel-bin/packages/kbn-alerts":
version "0.0.0"
uid ""
@ -6146,6 +6150,10 @@
version "0.0.0"
uid ""
"@types/kbn__aiops-utils@link:bazel-bin/packages/kbn-aiops-utils/npm_module_types":
version "0.0.0"
uid ""
"@types/kbn__alerts@link:bazel-bin/packages/kbn-alerts/npm_module_types":
version "0.0.0"
uid ""