mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
[ML] Adds redux toolkit example for response_stream
to developer examples. (#182690)
## Summary Follow up to #132590. Part of #181111. This updates the developer examples for `@kbn/ml-response-stream` to include a variant with a full Redux Toolkit setup. For this case, the `@kbn/ml-response-stream` now includes a generic slice `streamSlice` that can be used. This allows the actions created to be streamed via NDJSON to be shared across server and client. Functional tests for the examples were added too. To run these tests you can use the following commands: ``` # Start the test server (can continue running) node scripts/functional_tests_server.js --config test/examples/config.js # Start a test run node scripts/functional_test_runner.js --config test/examples/config.js ``` ### Checklist - [x] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios - [x] This was checked for breaking API changes and was [labeled appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)
This commit is contained in:
parent
9194c8857c
commit
5345e34ddc
31 changed files with 999 additions and 261 deletions
|
@ -4,11 +4,15 @@ This plugin demonstrates how to stream chunks of data to the client with just a
|
|||
|
||||
To run Kibana with the described examples, use `yarn start --run-examples`.
|
||||
|
||||
The `response_stream` plugin demonstrates API endpoints that can stream data chunks with a single request with gzip/compression support. gzip-streams get decompressed natively by browsers. The plugin demonstrates two use cases to get started: Streaming a raw string as well as a more complex example that streams Redux-like actions to the client which update React state via `useReducer()`.
|
||||
The `response_stream` plugin demonstrates API endpoints that can stream data chunks with a single request with gzip/compression support. gzip-streams get decompressed natively by browsers. The plugin demonstrates some use cases to get you started:
|
||||
|
||||
Code in `@kbn/ml-response-stream` contains helpers to set up a stream on the server side (`streamFactory()`) and consume it on the client side via a custom hook (`useFetchStream()`). The utilities make use of TS generics in a way that allows to have type safety for both request related options as well as the returned data.
|
||||
- Streaming just a raw string.
|
||||
- Streaming NDJSON with "old-school" redux like actions and client side state managed with `useFetchStream()`. This uses React's own `useReducer()` under the hood.
|
||||
- Streaming NDJSON with actions created via Redux Toolkit's `createSlice()` to a client with a full Redux Toolkit setup.
|
||||
|
||||
No additional third party libraries are used in the helpers to make it work. On the server, they integrate with `Hapi` and use node's own `gzip`. On the client, the custom hook abstracts away the necessary logic to consume the stream, internally it makes use of a generator function and `useReducer()` to update React state.
|
||||
Code in `@kbn/ml-response-stream` contains helpers to set up a stream on the server side (`streamFactory()`) and consume it on the client side via a custom hook (`useFetchStream()`) or slice (`streamSlice()`). The utilities make use of TS generics in a way that allows to have type safety for both request related options as well as the returned data.
|
||||
|
||||
Besides Redux Toolkit for its particular use case, no additional third party libraries are used in the helpers to make it work. On the server, they integrate with `Hapi` and use node's own `gzip`. On the client, the custom hook abstracts away the necessary logic to consume the stream, internally it makes use of a generator function and `useReducer()` to update React state.
|
||||
|
||||
On the server, the simpler stream to send a string is set up like this:
|
||||
|
||||
|
@ -21,12 +25,7 @@ The request's headers get passed on to automatically identify if compression is
|
|||
On the client, the custom hook is used like this:
|
||||
|
||||
```ts
|
||||
const {
|
||||
errors,
|
||||
start,
|
||||
cancel,
|
||||
data,
|
||||
isRunning
|
||||
} = useFetchStream('/internal/response_stream/simple_string_stream');
|
||||
const { errors, start, cancel, data, isRunning } = useFetchStream(
|
||||
'/internal/response_stream/simple_string_stream'
|
||||
);
|
||||
```
|
||||
|
||||
|
|
|
@ -8,5 +8,6 @@
|
|||
|
||||
export const RESPONSE_STREAM_API_ENDPOINT = {
|
||||
REDUCER_STREAM: '/internal/response_stream/reducer_stream',
|
||||
REDUX_STREAM: '/internal/response_stream/redux_stream',
|
||||
SIMPLE_STRING_STREAM: '/internal/response_stream/simple_string_stream',
|
||||
} as const;
|
||||
|
|
|
@ -9,71 +9,3 @@
|
|||
export { reducerStreamReducer } from './reducer';
|
||||
export { reducerStreamRequestBodySchema } from './request_body_schema';
|
||||
export type { ReducerStreamRequestBodySchema } from './request_body_schema';
|
||||
|
||||
export const API_ACTION_NAME = {
|
||||
UPDATE_PROGRESS: 'update_progress',
|
||||
ADD_TO_ENTITY: 'add_to_entity',
|
||||
DELETE_ENTITY: 'delete_entity',
|
||||
ERROR: 'error',
|
||||
} as const;
|
||||
export type ApiActionName = typeof API_ACTION_NAME[keyof typeof API_ACTION_NAME];
|
||||
|
||||
interface ApiActionUpdateProgress {
|
||||
type: typeof API_ACTION_NAME.UPDATE_PROGRESS;
|
||||
payload: number;
|
||||
}
|
||||
|
||||
export function updateProgressAction(payload: number): ApiActionUpdateProgress {
|
||||
return {
|
||||
type: API_ACTION_NAME.UPDATE_PROGRESS,
|
||||
payload,
|
||||
};
|
||||
}
|
||||
|
||||
interface ApiActionAddToEntity {
|
||||
type: typeof API_ACTION_NAME.ADD_TO_ENTITY;
|
||||
payload: {
|
||||
entity: string;
|
||||
value: number;
|
||||
};
|
||||
}
|
||||
|
||||
export function addToEntityAction(entity: string, value: number): ApiActionAddToEntity {
|
||||
return {
|
||||
type: API_ACTION_NAME.ADD_TO_ENTITY,
|
||||
payload: {
|
||||
entity,
|
||||
value,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
interface ApiActionDeleteEntity {
|
||||
type: typeof API_ACTION_NAME.DELETE_ENTITY;
|
||||
payload: string;
|
||||
}
|
||||
|
||||
export function deleteEntityAction(payload: string): ApiActionDeleteEntity {
|
||||
return {
|
||||
type: API_ACTION_NAME.DELETE_ENTITY,
|
||||
payload,
|
||||
};
|
||||
}
|
||||
|
||||
interface ApiActionError {
|
||||
type: typeof API_ACTION_NAME.ERROR;
|
||||
payload: string;
|
||||
}
|
||||
|
||||
export function errorAction(payload: string): ApiActionError {
|
||||
return {
|
||||
type: API_ACTION_NAME.ERROR,
|
||||
payload,
|
||||
};
|
||||
}
|
||||
|
||||
export type ReducerStreamApiAction =
|
||||
| ApiActionUpdateProgress
|
||||
| ApiActionAddToEntity
|
||||
| ApiActionDeleteEntity
|
||||
| ApiActionError;
|
||||
|
|
|
@ -6,24 +6,14 @@
|
|||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { ReducerStreamApiAction, API_ACTION_NAME } from '.';
|
||||
import { getInitialState, type StreamState } from '../stream_state';
|
||||
import { type ReducerStreamApiAction, API_ACTION_NAME } from './reducer_actions';
|
||||
|
||||
export const UI_ACTION_NAME = {
|
||||
RESET: 'reset',
|
||||
} as const;
|
||||
export type UiActionName = typeof UI_ACTION_NAME[keyof typeof UI_ACTION_NAME];
|
||||
|
||||
export interface StreamState {
|
||||
errors: string[];
|
||||
progress: number;
|
||||
entities: Record<string, number>;
|
||||
}
|
||||
export const initialState: StreamState = {
|
||||
errors: [],
|
||||
progress: 0,
|
||||
entities: {},
|
||||
};
|
||||
|
||||
interface UiActionResetStream {
|
||||
type: typeof UI_ACTION_NAME.RESET;
|
||||
}
|
||||
|
@ -34,14 +24,7 @@ export function resetStream(): UiActionResetStream {
|
|||
|
||||
type UiAction = UiActionResetStream;
|
||||
export type ReducerAction = ReducerStreamApiAction | UiAction;
|
||||
export function reducerStreamReducer(
|
||||
state: StreamState,
|
||||
action: ReducerAction | ReducerAction[]
|
||||
): StreamState {
|
||||
if (Array.isArray(action)) {
|
||||
return action.reduce(reducerStreamReducer, state);
|
||||
}
|
||||
|
||||
export function reducerStreamReducer(state: StreamState, action: ReducerAction): StreamState {
|
||||
switch (action.type) {
|
||||
case API_ACTION_NAME.UPDATE_PROGRESS:
|
||||
return {
|
||||
|
@ -72,7 +55,7 @@ export function reducerStreamReducer(
|
|||
errors: [...state.errors, action.payload],
|
||||
};
|
||||
case UI_ACTION_NAME.RESET:
|
||||
return initialState;
|
||||
return getInitialState();
|
||||
default:
|
||||
return state;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
export const API_ACTION_NAME = {
|
||||
UPDATE_PROGRESS: 'update_progress',
|
||||
ADD_TO_ENTITY: 'add_to_entity',
|
||||
DELETE_ENTITY: 'delete_entity',
|
||||
ERROR: 'error',
|
||||
} as const;
|
||||
export type ApiActionName = typeof API_ACTION_NAME[keyof typeof API_ACTION_NAME];
|
||||
|
||||
interface ApiActionUpdateProgress {
|
||||
type: typeof API_ACTION_NAME.UPDATE_PROGRESS;
|
||||
payload: number;
|
||||
}
|
||||
|
||||
export function updateProgressAction(payload: number): ApiActionUpdateProgress {
|
||||
return {
|
||||
type: API_ACTION_NAME.UPDATE_PROGRESS,
|
||||
payload,
|
||||
};
|
||||
}
|
||||
|
||||
interface ApiActionAddToEntity {
|
||||
type: typeof API_ACTION_NAME.ADD_TO_ENTITY;
|
||||
payload: {
|
||||
entity: string;
|
||||
value: number;
|
||||
};
|
||||
}
|
||||
|
||||
export function addToEntityAction(entity: string, value: number): ApiActionAddToEntity {
|
||||
return {
|
||||
type: API_ACTION_NAME.ADD_TO_ENTITY,
|
||||
payload: {
|
||||
entity,
|
||||
value,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
interface ApiActionDeleteEntity {
|
||||
type: typeof API_ACTION_NAME.DELETE_ENTITY;
|
||||
payload: string;
|
||||
}
|
||||
|
||||
export function deleteEntityAction(payload: string): ApiActionDeleteEntity {
|
||||
return {
|
||||
type: API_ACTION_NAME.DELETE_ENTITY,
|
||||
payload,
|
||||
};
|
||||
}
|
||||
|
||||
interface ApiActionError {
|
||||
type: typeof API_ACTION_NAME.ERROR;
|
||||
payload: string;
|
||||
}
|
||||
|
||||
export function errorAction(payload: string): ApiActionError {
|
||||
return {
|
||||
type: API_ACTION_NAME.ERROR,
|
||||
payload,
|
||||
};
|
||||
}
|
||||
|
||||
export type ReducerStreamApiAction =
|
||||
| ApiActionUpdateProgress
|
||||
| ApiActionAddToEntity
|
||||
| ApiActionDeleteEntity
|
||||
| ApiActionError;
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { createSlice } from '@reduxjs/toolkit';
|
||||
import type { PayloadAction } from '@reduxjs/toolkit';
|
||||
|
||||
import { getInitialState } from '../stream_state';
|
||||
|
||||
export const dataSlice = createSlice({
|
||||
name: 'development',
|
||||
initialState: getInitialState(),
|
||||
reducers: {
|
||||
updateProgress: (state, action: PayloadAction<number>) => {
|
||||
state.progress = action.payload;
|
||||
},
|
||||
addToEntity: (
|
||||
state,
|
||||
action: PayloadAction<{
|
||||
entity: string;
|
||||
value: number;
|
||||
}>
|
||||
) => {
|
||||
const { entity, value } = action.payload;
|
||||
state.entities[entity] = (state.entities[entity] || 0) + value;
|
||||
},
|
||||
deleteEntity: (state, action: PayloadAction<string>) => {
|
||||
delete state.entities[action.payload];
|
||||
},
|
||||
error: (state, action: PayloadAction<string>) => {
|
||||
state.errors.push(action.payload);
|
||||
},
|
||||
reset: () => {
|
||||
return getInitialState();
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
export const { updateProgress, addToEntity, deleteEntity, error, reset } = dataSlice.actions;
|
||||
export type ReduxStreamApiAction = ReturnType<
|
||||
typeof dataSlice.actions[keyof typeof dataSlice.actions]
|
||||
>;
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { createSlice } from '@reduxjs/toolkit';
|
||||
import type { PayloadAction } from '@reduxjs/toolkit';
|
||||
|
||||
const getInitialState = () => ({
|
||||
simulateErrors: false,
|
||||
compressResponse: true,
|
||||
flushFix: false,
|
||||
});
|
||||
|
||||
export const optionsSlice = createSlice({
|
||||
name: 'options',
|
||||
initialState: getInitialState(),
|
||||
reducers: {
|
||||
setSimulateErrors: (state, action: PayloadAction<boolean>) => {
|
||||
state.simulateErrors = action.payload;
|
||||
},
|
||||
setCompressResponse: (state, action: PayloadAction<boolean>) => {
|
||||
state.compressResponse = action.payload;
|
||||
},
|
||||
setFlushFix: (state, action: PayloadAction<boolean>) => {
|
||||
state.flushFix = action.payload;
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
export const { setSimulateErrors, setCompressResponse, setFlushFix } = optionsSlice.actions;
|
||||
export type ReduxOptionsApiAction = ReturnType<
|
||||
typeof optionsSlice.actions[keyof typeof optionsSlice.actions]
|
||||
>;
|
19
examples/response_stream/common/api/stream_state.ts
Normal file
19
examples/response_stream/common/api/stream_state.ts
Normal file
|
@ -0,0 +1,19 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
export interface StreamState {
|
||||
errors: string[];
|
||||
progress: number;
|
||||
entities: Record<string, number>;
|
||||
}
|
||||
|
||||
export const getInitialState = (): StreamState => ({
|
||||
errors: [],
|
||||
progress: 0,
|
||||
entities: {},
|
||||
});
|
|
@ -18,7 +18,7 @@ export const Page: FC<PropsWithChildren<PageProps>> = ({ title = 'Untitled', chi
|
|||
<>
|
||||
<EuiPageTemplate.Header>
|
||||
<EuiTitle size="l">
|
||||
<h1>{title}</h1>
|
||||
<h1 data-test-subj="responseStreamPageTitle">{title}</h1>
|
||||
</EuiTitle>
|
||||
</EuiPageTemplate.Header>
|
||||
<EuiPageTemplate.Section>{children}</EuiPageTemplate.Section>
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import React, { type FC } from 'react';
|
||||
|
||||
import {
|
||||
Chart,
|
||||
Settings,
|
||||
Axis,
|
||||
BarSeries,
|
||||
Position,
|
||||
ScaleType,
|
||||
LEGACY_LIGHT_THEME,
|
||||
} from '@elastic/charts';
|
||||
|
||||
interface BarChartRaceProps {
|
||||
entities: Record<string, number>;
|
||||
}
|
||||
|
||||
export const BarChartRace: FC<BarChartRaceProps> = ({ entities }) => {
|
||||
return (
|
||||
<div style={{ height: '300px' }}>
|
||||
<Chart>
|
||||
<Settings
|
||||
// TODO connect to charts.theme service see src/plugins/charts/public/services/theme/README.md
|
||||
baseTheme={LEGACY_LIGHT_THEME}
|
||||
rotation={90}
|
||||
/>
|
||||
<Axis id="entities" position={Position.Bottom} title="Commits" showOverlappingTicks />
|
||||
<Axis id="left2" title="Developers" position={Position.Left} />
|
||||
|
||||
<BarSeries
|
||||
id="commits"
|
||||
xScaleType={ScaleType.Ordinal}
|
||||
yScaleType={ScaleType.Linear}
|
||||
xAccessor="x"
|
||||
yAccessors={['y']}
|
||||
data={Object.entries(entities)
|
||||
.map(([x, y]) => {
|
||||
return {
|
||||
x,
|
||||
y,
|
||||
};
|
||||
})
|
||||
.sort((a, b) => b.y - a.y)}
|
||||
/>
|
||||
</Chart>
|
||||
</div>
|
||||
);
|
||||
};
|
|
@ -8,16 +8,6 @@
|
|||
|
||||
import React, { useEffect, useState, FC } from 'react';
|
||||
|
||||
import {
|
||||
Chart,
|
||||
Settings,
|
||||
Axis,
|
||||
BarSeries,
|
||||
Position,
|
||||
ScaleType,
|
||||
LEGACY_LIGHT_THEME,
|
||||
} from '@elastic/charts';
|
||||
|
||||
import {
|
||||
EuiBadge,
|
||||
EuiButton,
|
||||
|
@ -31,8 +21,8 @@ import {
|
|||
|
||||
import { useFetchStream } from '@kbn/ml-response-stream/client';
|
||||
|
||||
import { getInitialState } from '../../../../../common/api/stream_state';
|
||||
import {
|
||||
initialState,
|
||||
resetStream,
|
||||
reducerStreamReducer,
|
||||
} from '../../../../../common/api/reducer_stream/reducer';
|
||||
|
@ -42,7 +32,10 @@ import { Page } from '../../../../components/page';
|
|||
|
||||
import { useDeps } from '../../../../hooks/use_deps';
|
||||
|
||||
import { getStatusMessage } from './get_status_message';
|
||||
import { BarChartRace } from '../../components/bar_chart_race';
|
||||
import { getStatusMessage } from '../../components/get_status_message';
|
||||
|
||||
const initialState = getInitialState();
|
||||
|
||||
export const PageReducerStream: FC = () => {
|
||||
const {
|
||||
|
@ -72,8 +65,12 @@ export const PageReducerStream: FC = () => {
|
|||
}
|
||||
};
|
||||
|
||||
// TODO This approach needs to be adapted as it might miss when error messages arrive bulk.
|
||||
// This is for low level errors on the stream/HTTP level.
|
||||
// TODO This needs to be adapted as it might miss when error messages arrive
|
||||
// in bulk, but it should be good enough for this demo. This is for low level
|
||||
// errors on the HTTP level.Note this will only surface errors that happen for
|
||||
// the original request. Once the stream returns data, it will not be able to
|
||||
// return errors. This is why we need separate error handling for application
|
||||
// level errors.
|
||||
useEffect(() => {
|
||||
if (errors.length > 0) {
|
||||
notifications.toasts.addDanger(errors[errors.length - 1]);
|
||||
|
@ -91,27 +88,33 @@ export const PageReducerStream: FC = () => {
|
|||
const buttonLabel = isRunning ? 'Stop development' : 'Start development';
|
||||
|
||||
return (
|
||||
<Page title={'Reducer stream'}>
|
||||
<Page title={'NDJSON useReducer stream'}>
|
||||
<EuiText>
|
||||
<p>
|
||||
This demonstrates a single endpoint with streaming support that sends Redux inspired
|
||||
actions from server to client. The server and client share types of the data to be
|
||||
received. The client uses a custom hook that receives stream chunks and passes them on to
|
||||
`useReducer()` that acts on the Redux type actions it receives. The custom hook includes
|
||||
code to buffer actions and is able to apply them in bulk so the DOM does not get hammered
|
||||
with updates. Hit "Start development" to trigger the bar chart race!
|
||||
This demonstrates a single endpoint with streaming support that sends old school Redux
|
||||
inspired actions from server to client. The server and client share types of the data to
|
||||
be received. The client uses a custom hook that receives stream chunks and passes them on
|
||||
to `useReducer()` that acts on the actions it receives. The custom hook includes code to
|
||||
buffer actions and is able to apply them in bulk so the DOM does not get hammered with
|
||||
updates. Hit "Start development" to trigger the bar chart race!
|
||||
</p>
|
||||
</EuiText>
|
||||
<br />
|
||||
<EuiFlexGroup alignItems="center">
|
||||
<EuiFlexItem grow={false}>
|
||||
<EuiButton color="primary" size="s" onClick={onClickHandler} aria-label={buttonLabel}>
|
||||
<EuiButton
|
||||
data-test-subj="responseStreamStartButton"
|
||||
color="primary"
|
||||
size="s"
|
||||
onClick={onClickHandler}
|
||||
aria-label={buttonLabel}
|
||||
>
|
||||
{buttonLabel}
|
||||
</EuiButton>
|
||||
</EuiFlexItem>
|
||||
<EuiFlexItem grow={false}>
|
||||
<EuiText>
|
||||
<EuiBadge>{progress}%</EuiBadge>
|
||||
<EuiBadge data-test-subj="responseStreamProgressBadge">{progress}%</EuiBadge>
|
||||
</EuiText>
|
||||
</EuiFlexItem>
|
||||
<EuiFlexItem>
|
||||
|
@ -119,35 +122,11 @@ export const PageReducerStream: FC = () => {
|
|||
</EuiFlexItem>
|
||||
</EuiFlexGroup>
|
||||
<EuiSpacer />
|
||||
<div style={{ height: '300px' }}>
|
||||
<Chart>
|
||||
<Settings
|
||||
// TODO connect to charts.theme service see src/plugins/charts/public/services/theme/README.md
|
||||
baseTheme={LEGACY_LIGHT_THEME}
|
||||
rotation={90}
|
||||
/>
|
||||
<Axis id="entities" position={Position.Bottom} title="Commits" showOverlappingTicks />
|
||||
<Axis id="left2" title="Developers" position={Position.Left} />
|
||||
|
||||
<BarSeries
|
||||
id="commits"
|
||||
xScaleType={ScaleType.Linear}
|
||||
yScaleType={ScaleType.Linear}
|
||||
xAccessor="x"
|
||||
yAccessors={['y']}
|
||||
data={Object.entries(entities)
|
||||
.map(([x, y]) => {
|
||||
return {
|
||||
x,
|
||||
y,
|
||||
};
|
||||
})
|
||||
.sort((a, b) => b.y - a.y)}
|
||||
/>
|
||||
</Chart>
|
||||
</div>
|
||||
<BarChartRace entities={entities} />
|
||||
<EuiText>
|
||||
<p>{getStatusMessage(isRunning, isCancelled, data.progress)}</p>
|
||||
<p data-test-subj="responseStreamStatusMessage">
|
||||
{getStatusMessage(isRunning, isCancelled, progress)}
|
||||
</p>
|
||||
<EuiCheckbox
|
||||
id="responseStreamSimulateErrorsCheckbox"
|
||||
label="Simulate errors (gets applied to new streams only, not currently running ones)."
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import type { TypedUseSelectorHook } from 'react-redux';
|
||||
import { useDispatch, useSelector, useStore } from 'react-redux';
|
||||
import type { AppDispatch, AppStore, RootState } from './store';
|
||||
|
||||
// Use throughout your app instead of plain `useDispatch` and `useSelector`
|
||||
export const useAppDispatch: () => AppDispatch = useDispatch;
|
||||
export const useAppSelector: TypedUseSelectorHook<RootState> = useSelector;
|
||||
export const useAppStore: () => AppStore = useStore;
|
|
@ -0,0 +1,158 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import React, { useEffect, useRef, FC } from 'react';
|
||||
|
||||
import {
|
||||
EuiBadge,
|
||||
EuiButton,
|
||||
EuiCheckbox,
|
||||
EuiFlexGroup,
|
||||
EuiFlexItem,
|
||||
EuiProgress,
|
||||
EuiSpacer,
|
||||
EuiText,
|
||||
} from '@elastic/eui';
|
||||
|
||||
import { cancelStream, startStream } from '@kbn/ml-response-stream/client';
|
||||
|
||||
import { RESPONSE_STREAM_API_ENDPOINT } from '../../../../../common/api';
|
||||
import {
|
||||
setSimulateErrors,
|
||||
setCompressResponse,
|
||||
setFlushFix,
|
||||
} from '../../../../../common/api/redux_stream/options_slice';
|
||||
import { reset } from '../../../../../common/api/redux_stream/data_slice';
|
||||
|
||||
import { Page } from '../../../../components/page';
|
||||
import { useDeps } from '../../../../hooks/use_deps';
|
||||
|
||||
import { BarChartRace } from '../../components/bar_chart_race';
|
||||
import { getStatusMessage } from '../../components/get_status_message';
|
||||
|
||||
import { useAppDispatch, useAppSelector } from './hooks';
|
||||
|
||||
export const PageReduxStream: FC = () => {
|
||||
const {
|
||||
core: { http, notifications },
|
||||
} = useDeps();
|
||||
|
||||
const dispatch = useAppDispatch();
|
||||
const { isRunning, isCancelled, errors: streamErrors } = useAppSelector((s) => s.stream);
|
||||
const { progress, entities, errors } = useAppSelector((s) => s.data);
|
||||
const { simulateErrors, compressResponse, flushFix } = useAppSelector((s) => s.options);
|
||||
|
||||
const abortCtrl = useRef(new AbortController());
|
||||
|
||||
const onClickHandler = async () => {
|
||||
if (isRunning) {
|
||||
abortCtrl.current.abort();
|
||||
dispatch(cancelStream());
|
||||
} else {
|
||||
abortCtrl.current = new AbortController();
|
||||
dispatch(reset());
|
||||
dispatch(
|
||||
startStream({
|
||||
http,
|
||||
endpoint: RESPONSE_STREAM_API_ENDPOINT.REDUX_STREAM,
|
||||
apiVersion: '1',
|
||||
abortCtrl,
|
||||
body: { compressResponse, flushFix, simulateErrors },
|
||||
})
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
// TODO This needs to be adapted as it might miss when error messages arrive
|
||||
// in bulk, but it should be good enough for this demo. This is for low level
|
||||
// errors on the HTTP level.Note this will only surface errors that happen for
|
||||
// the original request. Once the stream returns data, it will not be able to
|
||||
// return errors. This is why we need separate error handling for application
|
||||
// level errors.
|
||||
useEffect(() => {
|
||||
if (streamErrors.length > 0) {
|
||||
notifications.toasts.addDanger(streamErrors[streamErrors.length - 1]);
|
||||
}
|
||||
}, [streamErrors, notifications.toasts]);
|
||||
|
||||
// TODO This approach needs to be adapted as it might miss when error messages arrive bulk.
|
||||
// This is for errors on the application level
|
||||
useEffect(() => {
|
||||
if (errors.length > 0) {
|
||||
notifications.toasts.addDanger(errors[errors.length - 1]);
|
||||
}
|
||||
}, [errors, notifications.toasts]);
|
||||
|
||||
const buttonLabel = isRunning ? 'Stop development' : 'Start development';
|
||||
|
||||
return (
|
||||
<Page title={'NDJSON Redux Toolkit stream'}>
|
||||
<EuiText>
|
||||
<p>
|
||||
This demonstrates integration of a single endpoint with streaming support with Redux
|
||||
Toolkit. The server and client share actions created via `createSlice`. The server sends a
|
||||
stream of NDJSON data to the client where each line is a redux action. The client then
|
||||
applies these actions to its state. The package `@kbn/ml-response-stream` exposes a slice
|
||||
of the state that can be used to start and cancel the stream. The `startStream` action is
|
||||
implemented as an async thunk that starts the stream and then dispatches received actions
|
||||
to the store. Hit "Start development" to trigger the bar chart race!
|
||||
</p>
|
||||
</EuiText>
|
||||
<br />
|
||||
<EuiFlexGroup alignItems="center">
|
||||
<EuiFlexItem grow={false}>
|
||||
<EuiButton
|
||||
data-test-subj="responseStreamStartButton"
|
||||
color="primary"
|
||||
size="s"
|
||||
onClick={onClickHandler}
|
||||
aria-label={buttonLabel}
|
||||
>
|
||||
{buttonLabel}
|
||||
</EuiButton>
|
||||
</EuiFlexItem>
|
||||
<EuiFlexItem grow={false}>
|
||||
<EuiText>
|
||||
<EuiBadge data-test-subj="responseStreamProgressBadge">{progress}%</EuiBadge>
|
||||
</EuiText>
|
||||
</EuiFlexItem>
|
||||
<EuiFlexItem>
|
||||
<EuiProgress value={progress} max={100} size="xs" />
|
||||
</EuiFlexItem>
|
||||
</EuiFlexGroup>
|
||||
<EuiSpacer />
|
||||
<BarChartRace entities={entities} />
|
||||
<EuiText>
|
||||
<p data-test-subj="responseStreamStatusMessage">
|
||||
{getStatusMessage(isRunning, isCancelled, progress)}
|
||||
</p>
|
||||
<EuiCheckbox
|
||||
id="responseStreamSimulateErrorsCheckbox"
|
||||
label="Simulate errors (gets applied to new streams only, not currently running ones)."
|
||||
checked={simulateErrors}
|
||||
onChange={(e) => dispatch(setSimulateErrors(!simulateErrors))}
|
||||
compressed
|
||||
/>
|
||||
<EuiCheckbox
|
||||
id="responseStreamCompressionCheckbox"
|
||||
label="Toggle compression setting for response stream."
|
||||
checked={compressResponse}
|
||||
onChange={(e) => dispatch(setCompressResponse(!compressResponse))}
|
||||
compressed
|
||||
/>
|
||||
<EuiCheckbox
|
||||
id="responseStreamFlushFixCheckbox"
|
||||
label="Toggle flushFix setting for response stream."
|
||||
checked={flushFix}
|
||||
onChange={(e) => dispatch(setFlushFix(!flushFix))}
|
||||
compressed
|
||||
/>
|
||||
</EuiText>
|
||||
</Page>
|
||||
);
|
||||
};
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import React, { type FC, type PropsWithChildren } from 'react';
|
||||
import { configureStore } from '@reduxjs/toolkit';
|
||||
import { Provider } from 'react-redux';
|
||||
|
||||
import { streamSlice } from '@kbn/ml-response-stream/client';
|
||||
|
||||
import { optionsSlice } from '../../../../../common/api/redux_stream/options_slice';
|
||||
import { dataSlice } from '../../../../../common/api/redux_stream/data_slice';
|
||||
|
||||
const reduxStore = configureStore({
|
||||
reducer: {
|
||||
// State of the stream: is it running, has it errored, etc.
|
||||
stream: streamSlice.reducer,
|
||||
// The actual data returned by the stream.
|
||||
data: dataSlice.reducer,
|
||||
// Options for the stream: simulate errors, compress response, etc.
|
||||
options: optionsSlice.reducer,
|
||||
},
|
||||
});
|
||||
|
||||
export const ReduxStreamProvider: FC<PropsWithChildren<{}>> = ({ children }) => (
|
||||
<Provider store={reduxStore}>{children}</Provider>
|
||||
);
|
||||
|
||||
// Infer the `RootState` and `AppDispatch` types from the store itself
|
||||
export type AppStore = typeof reduxStore;
|
||||
export type RootState = ReturnType<AppStore['getState']>;
|
||||
export type AppDispatch = AppStore['dispatch'];
|
|
@ -66,7 +66,13 @@ export const PageSimpleStringStream: FC = () => {
|
|||
<br />
|
||||
<EuiFlexGroup alignItems="center">
|
||||
<EuiFlexItem grow={false}>
|
||||
<EuiButton color="primary" size="s" onClick={onClickHandler} aria-label={buttonLabel}>
|
||||
<EuiButton
|
||||
data-test-subj="responseStreamStartButton"
|
||||
color="primary"
|
||||
size="s"
|
||||
onClick={onClickHandler}
|
||||
aria-label={buttonLabel}
|
||||
>
|
||||
{buttonLabel}
|
||||
</EuiButton>
|
||||
</EuiFlexItem>
|
||||
|
@ -81,7 +87,7 @@ export const PageSimpleStringStream: FC = () => {
|
|||
/>
|
||||
<EuiSpacer />
|
||||
<EuiText>
|
||||
<p>{data}</p>
|
||||
<p data-test-subj="responseStreamString">{data}</p>
|
||||
</EuiText>
|
||||
{errors.length > 0 && (
|
||||
<EuiCallOut title="Sorry, there was an error" color="danger" iconType="warning">
|
||||
|
|
|
@ -10,6 +10,8 @@ import React from 'react';
|
|||
import { PLUGIN_ID, PLUGIN_NAME } from '../common/constants';
|
||||
import { PageSimpleStringStream } from './containers/app/pages/page_simple_string_stream';
|
||||
import { PageReducerStream } from './containers/app/pages/page_reducer_stream';
|
||||
import { PageReduxStream } from './containers/app/pages/page_redux_stream';
|
||||
import { ReduxStreamProvider } from './containers/app/pages/page_redux_stream/store';
|
||||
|
||||
interface RouteSectionDef {
|
||||
title: string;
|
||||
|
@ -34,10 +36,19 @@ export const routes: RouteSectionDef[] = [
|
|||
component: <PageSimpleStringStream />,
|
||||
},
|
||||
{
|
||||
title: 'Reducer stream',
|
||||
id: 'reducer-stream',
|
||||
title: 'NDJSON useReducer stream',
|
||||
id: 'ndjson-usereducer-stream',
|
||||
component: <PageReducerStream />,
|
||||
},
|
||||
{
|
||||
title: 'NDJSON Redux Toolkit stream',
|
||||
id: 'ndjson-redux-toolkit-stream',
|
||||
component: (
|
||||
<ReduxStreamProvider>
|
||||
<PageReduxStream />
|
||||
</ReduxStreamProvider>
|
||||
),
|
||||
},
|
||||
],
|
||||
},
|
||||
];
|
||||
|
|
|
@ -9,7 +9,11 @@
|
|||
import { Plugin, PluginInitializerContext, CoreSetup, CoreStart, Logger } from '@kbn/core/server';
|
||||
import type { DataRequestHandlerContext } from '@kbn/data-plugin/server';
|
||||
|
||||
import { defineReducerStreamRoute, defineSimpleStringStreamRoute } from './routes';
|
||||
import {
|
||||
defineReducerStreamRoute,
|
||||
defineReduxStreamRoute,
|
||||
defineSimpleStringStreamRoute,
|
||||
} from './routes';
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-empty-interface
|
||||
export interface ResponseStreamSetupPlugins {}
|
||||
|
@ -29,6 +33,7 @@ export class ResponseStreamPlugin implements Plugin {
|
|||
|
||||
void core.getStartServices().then(([_, depsStart]) => {
|
||||
defineReducerStreamRoute(router, this.logger);
|
||||
defineReduxStreamRoute(router, this.logger);
|
||||
defineSimpleStringStreamRoute(router, this.logger);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -7,4 +7,5 @@
|
|||
*/
|
||||
|
||||
export { defineReducerStreamRoute } from './reducer_stream';
|
||||
export { defineReduxStreamRoute } from './redux_stream';
|
||||
export { defineSimpleStringStreamRoute } from './single_string_stream';
|
||||
|
|
|
@ -11,14 +11,16 @@ import { streamFactory } from '@kbn/ml-response-stream/server';
|
|||
|
||||
import {
|
||||
errorAction,
|
||||
reducerStreamRequestBodySchema,
|
||||
updateProgressAction,
|
||||
addToEntityAction,
|
||||
deleteEntityAction,
|
||||
ReducerStreamApiAction,
|
||||
} from '../../common/api/reducer_stream';
|
||||
} from '../../common/api/reducer_stream/reducer_actions';
|
||||
import { reducerStreamRequestBodySchema } from '../../common/api/reducer_stream';
|
||||
import { RESPONSE_STREAM_API_ENDPOINT } from '../../common/api';
|
||||
|
||||
import { entities, getActions } from './shared';
|
||||
|
||||
export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => {
|
||||
router.versioned
|
||||
.post({
|
||||
|
@ -64,23 +66,7 @@ export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => {
|
|||
request.body.flushFix
|
||||
);
|
||||
|
||||
const entities = [
|
||||
'kimchy',
|
||||
's1monw',
|
||||
'martijnvg',
|
||||
'jasontedor',
|
||||
'nik9000',
|
||||
'javanna',
|
||||
'rjernst',
|
||||
'jrodewig',
|
||||
];
|
||||
|
||||
const actions = [...Array(19).fill('add'), 'delete'];
|
||||
|
||||
if (simulateError) {
|
||||
actions.push('throw-error');
|
||||
actions.push('emit-error');
|
||||
}
|
||||
const actions = getActions(simulateError);
|
||||
|
||||
let progress = 0;
|
||||
|
||||
|
@ -101,19 +87,28 @@ export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => {
|
|||
const randomEntity = entities[Math.floor(Math.random() * entities.length)];
|
||||
const randomAction = actions[Math.floor(Math.random() * actions.length)];
|
||||
|
||||
if (randomAction === 'add') {
|
||||
const randomCommits = Math.floor(Math.random() * 100);
|
||||
push(addToEntityAction(randomEntity, randomCommits));
|
||||
} else if (randomAction === 'delete') {
|
||||
push(deleteEntityAction(randomEntity));
|
||||
} else if (randomAction === 'throw-error') {
|
||||
// Throw an error. It should not crash Kibana!
|
||||
// It should be caught and logged to the Kibana server console.
|
||||
throw new Error('There was a (simulated) server side error!');
|
||||
} else if (randomAction === 'emit-error') {
|
||||
// Emit an error as a stream action.
|
||||
push(errorAction('(Simulated) error pushed to the stream'));
|
||||
return;
|
||||
switch (randomAction) {
|
||||
case 'add':
|
||||
const randomCommits = Math.floor(Math.random() * 100);
|
||||
push(addToEntityAction(randomEntity, randomCommits));
|
||||
break;
|
||||
|
||||
case 'delete':
|
||||
push(deleteEntityAction(randomEntity));
|
||||
break;
|
||||
|
||||
case 'throw-error':
|
||||
// Throw an error. It should not crash Kibana!
|
||||
// It should be caught and logged to the Kibana server console.
|
||||
// The stream will just stop but the client will note receive an error!
|
||||
// In practice this pattern should be avoided as it will just end
|
||||
// the stream without an explanation.
|
||||
throw new Error('There was a (simulated) server side error!');
|
||||
|
||||
case 'emit-error':
|
||||
// Emit an error as a stream action.
|
||||
push(errorAction('(Simulated) error pushed to the stream'));
|
||||
return;
|
||||
}
|
||||
|
||||
void pushStreamUpdate();
|
||||
|
|
126
examples/response_stream/server/routes/redux_stream.ts
Normal file
126
examples/response_stream/server/routes/redux_stream.ts
Normal file
|
@ -0,0 +1,126 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import type { IRouter, Logger } from '@kbn/core/server';
|
||||
import { streamFactory } from '@kbn/ml-response-stream/server';
|
||||
|
||||
import {
|
||||
updateProgress,
|
||||
addToEntity,
|
||||
deleteEntity,
|
||||
error,
|
||||
type ReduxStreamApiAction,
|
||||
} from '../../common/api/redux_stream/data_slice';
|
||||
import { reducerStreamRequestBodySchema } from '../../common/api/reducer_stream';
|
||||
import { RESPONSE_STREAM_API_ENDPOINT } from '../../common/api';
|
||||
|
||||
import { entities, getActions } from './shared';
|
||||
|
||||
export const defineReduxStreamRoute = (router: IRouter, logger: Logger) => {
|
||||
router.versioned
|
||||
.post({
|
||||
path: RESPONSE_STREAM_API_ENDPOINT.REDUX_STREAM,
|
||||
access: 'internal',
|
||||
})
|
||||
.addVersion(
|
||||
{
|
||||
version: '1',
|
||||
validate: {
|
||||
request: {
|
||||
body: reducerStreamRequestBodySchema,
|
||||
},
|
||||
},
|
||||
},
|
||||
async (context, request, response) => {
|
||||
const maxTimeoutMs = request.body.timeout ?? 250;
|
||||
const simulateError = request.body.simulateErrors ?? false;
|
||||
|
||||
let logMessageCounter = 1;
|
||||
|
||||
function logDebugMessage(msg: string) {
|
||||
logger.debug(`Response Stream Example #${logMessageCounter}: ${msg}`);
|
||||
logMessageCounter++;
|
||||
}
|
||||
|
||||
logDebugMessage('Starting stream.');
|
||||
|
||||
let shouldStop = false;
|
||||
request.events.aborted$.subscribe(() => {
|
||||
logDebugMessage('aborted$ subscription trigger.');
|
||||
shouldStop = true;
|
||||
});
|
||||
request.events.completed$.subscribe(() => {
|
||||
logDebugMessage('completed$ subscription trigger.');
|
||||
shouldStop = true;
|
||||
});
|
||||
|
||||
const { end, push, responseWithHeaders } = streamFactory<ReduxStreamApiAction>(
|
||||
request.headers,
|
||||
logger,
|
||||
request.body.compressResponse,
|
||||
request.body.flushFix
|
||||
);
|
||||
|
||||
const actions = getActions(simulateError);
|
||||
|
||||
let progress = 0;
|
||||
|
||||
async function pushStreamUpdate() {
|
||||
await new Promise((resolve) =>
|
||||
setTimeout(resolve, Math.floor(Math.random() * maxTimeoutMs))
|
||||
);
|
||||
try {
|
||||
progress++;
|
||||
|
||||
if (progress > 100 || shouldStop) {
|
||||
end();
|
||||
return;
|
||||
}
|
||||
|
||||
push(updateProgress(progress));
|
||||
|
||||
const randomEntity = entities[Math.floor(Math.random() * entities.length)];
|
||||
const randomAction = actions[Math.floor(Math.random() * actions.length)];
|
||||
|
||||
switch (randomAction) {
|
||||
case 'add':
|
||||
const randomCommits = Math.floor(Math.random() * 100);
|
||||
push(addToEntity({ entity: randomEntity, value: randomCommits }));
|
||||
break;
|
||||
|
||||
case 'delete':
|
||||
push(deleteEntity(randomEntity));
|
||||
break;
|
||||
|
||||
case 'throw-error':
|
||||
// Throw an error. It should not crash Kibana!
|
||||
// It should be caught and logged to the Kibana server console.
|
||||
// The stream will just stop but the client will note receive an error!
|
||||
// In practice this pattern should be avoided as it will just end
|
||||
// the stream without an explanation.
|
||||
throw new Error('There was a (simulated) server side error!');
|
||||
|
||||
case 'emit-error':
|
||||
// Emit an error as a stream action.
|
||||
push(error('(Simulated) error pushed to the stream'));
|
||||
return;
|
||||
}
|
||||
|
||||
void pushStreamUpdate();
|
||||
} catch (e) {
|
||||
logger.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
// do not call this using `await` so it will run asynchronously while we return the stream already.
|
||||
void pushStreamUpdate();
|
||||
|
||||
return response.ok(responseWithHeaders);
|
||||
}
|
||||
);
|
||||
};
|
29
examples/response_stream/server/routes/shared.ts
Normal file
29
examples/response_stream/server/routes/shared.ts
Normal file
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
export const entities = [
|
||||
'kimchy',
|
||||
's1monw',
|
||||
'martijnvg',
|
||||
'jasontedor',
|
||||
'nik9000',
|
||||
'javanna',
|
||||
'rjernst',
|
||||
'jrodewig',
|
||||
];
|
||||
|
||||
export const getActions = (simulateError: boolean) => {
|
||||
const actions = [...Array(19).fill('add'), 'delete'];
|
||||
|
||||
if (simulateError) {
|
||||
actions.push('throw-error');
|
||||
actions.push('emit-error');
|
||||
}
|
||||
|
||||
return actions;
|
||||
};
|
|
@ -31,6 +31,7 @@ export default async function ({ readConfigFile }) {
|
|||
require.resolve('./unified_field_list_examples'),
|
||||
require.resolve('./discover_customization_examples'),
|
||||
require.resolve('./error_boundary'),
|
||||
require.resolve('./response_stream'),
|
||||
],
|
||||
services: {
|
||||
...functionalConfig.get('services'),
|
||||
|
|
|
@ -10,7 +10,17 @@ import { FtrProviderContext } from '../../functional/ftr_provider_context';
|
|||
|
||||
// eslint-disable-next-line import/no-default-export
|
||||
export default function ({ getService, getPageObjects, loadTestFile }: FtrProviderContext) {
|
||||
describe('response stream', function () {
|
||||
const browser = getService('browser');
|
||||
const PageObjects = getPageObjects(['common', 'header']);
|
||||
|
||||
describe('response-stream', function () {
|
||||
before(async () => {
|
||||
await browser.setWindowSize(1300, 900);
|
||||
await PageObjects.common.navigateToApp('response-stream', { insertTimestamp: false });
|
||||
});
|
||||
|
||||
loadTestFile(require.resolve('./string_stream'));
|
||||
loadTestFile(require.resolve('./reducer_stream'));
|
||||
loadTestFile(require.resolve('./redux_stream'));
|
||||
});
|
||||
}
|
||||
|
|
|
@ -6,84 +6,47 @@
|
|||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import fetch from 'node-fetch';
|
||||
import { format as formatUrl } from 'url';
|
||||
|
||||
import expect from '@kbn/expect';
|
||||
|
||||
import { FtrProviderContext } from '../../functional/ftr_provider_context';
|
||||
|
||||
import { parseStream } from './parse_stream';
|
||||
|
||||
// eslint-disable-next-line import/no-default-export
|
||||
export default ({ getService }: FtrProviderContext) => {
|
||||
const supertest = getService('supertest');
|
||||
const config = getService('config');
|
||||
const kibanaServerUrl = formatUrl(config.get('servers.kibana'));
|
||||
export default function ({ getService, getPageObjects }: FtrProviderContext) {
|
||||
const testSubjects = getService('testSubjects');
|
||||
const retry = getService('retry');
|
||||
|
||||
describe('POST /internal/response_stream/reducer_stream', () => {
|
||||
it('should return full data without streaming', async () => {
|
||||
const resp = await supertest
|
||||
.post('/internal/response_stream/reducer_stream')
|
||||
.set('kbn-xsrf', 'kibana')
|
||||
.send({
|
||||
timeout: 1,
|
||||
})
|
||||
.expect(200);
|
||||
describe('useReducer stream example', () => {
|
||||
it('navigates to the example', async () => {
|
||||
await testSubjects.click('ndjson-usereducer-stream');
|
||||
|
||||
expect(Buffer.isBuffer(resp.body)).to.be(true);
|
||||
|
||||
const chunks: string[] = resp.body.toString().split('\n');
|
||||
|
||||
expect(chunks.length).to.be(201);
|
||||
|
||||
const lastChunk = chunks.pop();
|
||||
expect(lastChunk).to.be('');
|
||||
|
||||
let data: any[] = [];
|
||||
|
||||
expect(() => {
|
||||
data = chunks.map((c) => JSON.parse(c));
|
||||
}).not.to.throwError();
|
||||
|
||||
data.forEach((d) => {
|
||||
expect(typeof d.type).to.be('string');
|
||||
await retry.try(async () => {
|
||||
expect(await testSubjects.getVisibleText('responseStreamPageTitle')).to.be(
|
||||
'NDJSON useReducer stream'
|
||||
);
|
||||
expect(await testSubjects.getVisibleText('responseStreamProgressBadge')).to.be('0%');
|
||||
expect(await testSubjects.getVisibleText('responseStreamStatusMessage')).to.be(
|
||||
'Development did not start yet.'
|
||||
);
|
||||
});
|
||||
|
||||
const progressData = data.filter((d) => d.type === 'update_progress');
|
||||
expect(progressData.length).to.be(100);
|
||||
expect(progressData[0].payload).to.be(1);
|
||||
expect(progressData[progressData.length - 1].payload).to.be(100);
|
||||
});
|
||||
|
||||
it('should return data in chunks with streaming', async () => {
|
||||
const response = await fetch(`${kibanaServerUrl}/internal/response_stream/reducer_stream`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'kbn-xsrf': 'stream',
|
||||
},
|
||||
body: JSON.stringify({ timeout: 1 }),
|
||||
it('starts the stream', async () => {
|
||||
await testSubjects.click('responseStreamStartButton');
|
||||
|
||||
await retry.try(async () => {
|
||||
expect(await testSubjects.getVisibleText('responseStreamProgressBadge')).not.to.be('0%');
|
||||
expect(await testSubjects.getVisibleText('responseStreamStatusMessage')).to.be(
|
||||
'Development is ongoing, the hype is real!'
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
const stream = response.body;
|
||||
|
||||
expect(stream).not.to.be(null);
|
||||
|
||||
if (stream !== null) {
|
||||
const progressData: any[] = [];
|
||||
|
||||
for await (const action of parseStream(stream)) {
|
||||
expect(action.type).not.to.be('error');
|
||||
if (action.type === 'update_progress') {
|
||||
progressData.push(action);
|
||||
}
|
||||
}
|
||||
|
||||
expect(progressData.length).to.be(100);
|
||||
expect(progressData[0].payload).to.be(1);
|
||||
expect(progressData[progressData.length - 1].payload).to.be(100);
|
||||
}
|
||||
it('finishes the stream', async () => {
|
||||
await retry.tryForTime(60000, async () => {
|
||||
expect(await testSubjects.getVisibleText('responseStreamProgressBadge')).to.be('100%');
|
||||
expect(await testSubjects.getVisibleText('responseStreamStatusMessage')).to.be(
|
||||
'Development completed, the release got out the door!'
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
};
|
||||
}
|
||||
|
|
52
test/examples/response_stream/redux_stream.ts
Normal file
52
test/examples/response_stream/redux_stream.ts
Normal file
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import expect from '@kbn/expect';
|
||||
import { FtrProviderContext } from '../../functional/ftr_provider_context';
|
||||
|
||||
// eslint-disable-next-line import/no-default-export
|
||||
export default function ({ getService, getPageObjects }: FtrProviderContext) {
|
||||
const testSubjects = getService('testSubjects');
|
||||
const retry = getService('retry');
|
||||
|
||||
describe('redux stream example', () => {
|
||||
it('navigates to the example', async () => {
|
||||
await testSubjects.click('ndjson-redux-toolkit-stream');
|
||||
|
||||
await retry.try(async () => {
|
||||
expect(await testSubjects.getVisibleText('responseStreamPageTitle')).to.be(
|
||||
'NDJSON Redux Toolkit stream'
|
||||
);
|
||||
expect(await testSubjects.getVisibleText('responseStreamProgressBadge')).to.be('0%');
|
||||
expect(await testSubjects.getVisibleText('responseStreamStatusMessage')).to.be(
|
||||
'Development did not start yet.'
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
it('starts the stream', async () => {
|
||||
await testSubjects.click('responseStreamStartButton');
|
||||
|
||||
await retry.try(async () => {
|
||||
expect(await testSubjects.getVisibleText('responseStreamProgressBadge')).not.to.be('0%');
|
||||
expect(await testSubjects.getVisibleText('responseStreamStatusMessage')).to.be(
|
||||
'Development is ongoing, the hype is real!'
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
it('finishes the stream', async () => {
|
||||
await retry.tryForTime(60000, async () => {
|
||||
expect(await testSubjects.getVisibleText('responseStreamProgressBadge')).to.be('100%');
|
||||
expect(await testSubjects.getVisibleText('responseStreamStatusMessage')).to.be(
|
||||
'Development completed, the release got out the door!'
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
46
test/examples/response_stream/string_stream.ts
Normal file
46
test/examples/response_stream/string_stream.ts
Normal file
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import expect from '@kbn/expect';
|
||||
import { FtrProviderContext } from '../../functional/ftr_provider_context';
|
||||
|
||||
// eslint-disable-next-line import/no-default-export
|
||||
export default function ({ getService, getPageObjects }: FtrProviderContext) {
|
||||
const testSubjects = getService('testSubjects');
|
||||
const retry = getService('retry');
|
||||
|
||||
describe('string stream example', () => {
|
||||
it('navigates to the example', async () => {
|
||||
await testSubjects.click('simple-string-stream');
|
||||
|
||||
await retry.try(async () => {
|
||||
expect(await testSubjects.getVisibleText('responseStreamPageTitle')).to.be(
|
||||
'Simple string stream'
|
||||
);
|
||||
expect(await testSubjects.exists('responseStreamStartButton')).to.be(true);
|
||||
expect(await testSubjects.getVisibleText('responseStreamString')).to.be('');
|
||||
});
|
||||
});
|
||||
|
||||
it('starts the stream', async () => {
|
||||
await testSubjects.click('responseStreamStartButton');
|
||||
|
||||
await retry.try(async () => {
|
||||
expect(await testSubjects.getVisibleText('responseStreamString')).not.to.be('');
|
||||
});
|
||||
});
|
||||
|
||||
it('finishes the stream', async () => {
|
||||
await retry.tryForTime(60000, async () => {
|
||||
expect(await testSubjects.getVisibleText('responseStreamString')).to.be(
|
||||
'Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents. Elasticsearch is developed in Java and is dual-licensed under the source-available Server Side Public License and the Elastic license, while other parts fall under the proprietary (source-available) Elastic License. Official clients are available in Java, .NET (C#), PHP, Python, Apache Groovy, Ruby and many other languages. According to the DB-Engines ranking, Elasticsearch is the most popular enterprise search engine.'
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
8
x-pack/packages/ml/response_stream/client/constants.ts
Normal file
8
x-pack/packages/ml/response_stream/client/constants.ts
Normal file
|
@ -0,0 +1,8 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
export const DATA_THROTTLE_MS = 100;
|
|
@ -6,4 +6,5 @@
|
|||
*/
|
||||
|
||||
export { fetchStream } from './fetch_stream';
|
||||
export { cancelStream, startStream, streamSlice } from './stream_slice';
|
||||
export { useFetchStream } from './use_fetch_stream';
|
||||
|
|
148
x-pack/packages/ml/response_stream/client/stream_slice.ts
Normal file
148
x-pack/packages/ml/response_stream/client/stream_slice.ts
Normal file
|
@ -0,0 +1,148 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { AnyAction, PayloadAction } from '@reduxjs/toolkit';
|
||||
import { createAsyncThunk, createSlice } from '@reduxjs/toolkit';
|
||||
import { batch } from 'react-redux';
|
||||
|
||||
import type { HttpSetup, HttpFetchOptions } from '@kbn/core/public';
|
||||
import { fetchStream } from './fetch_stream';
|
||||
import { DATA_THROTTLE_MS } from './constants';
|
||||
|
||||
/**
|
||||
* Async thunk to start the stream.
|
||||
*/
|
||||
export const startStream = createAsyncThunk(
|
||||
'stream/start',
|
||||
async (
|
||||
options: {
|
||||
http: HttpSetup;
|
||||
endpoint: string;
|
||||
apiVersion?: string;
|
||||
abortCtrl: React.MutableRefObject<AbortController>;
|
||||
body?: any;
|
||||
headers?: HttpFetchOptions['headers'];
|
||||
},
|
||||
thunkApi
|
||||
) => {
|
||||
const { http, endpoint, apiVersion, abortCtrl, body, headers } = options;
|
||||
|
||||
const fetchState = { isActive: true };
|
||||
|
||||
// Custom buffering to avoid hammering the DOM with updates.
|
||||
// We can revisit this once Kibana is on React 18.
|
||||
const actionBuffer: AnyAction[] = [];
|
||||
function flushBuffer(withTimeout = true) {
|
||||
batch(() => {
|
||||
for (const action of actionBuffer) {
|
||||
thunkApi.dispatch(action);
|
||||
}
|
||||
});
|
||||
actionBuffer.length = 0;
|
||||
|
||||
if (withTimeout) {
|
||||
setTimeout(() => {
|
||||
if (fetchState.isActive) {
|
||||
flushBuffer();
|
||||
}
|
||||
}, DATA_THROTTLE_MS);
|
||||
}
|
||||
}
|
||||
|
||||
flushBuffer();
|
||||
|
||||
for await (const [fetchStreamError, action] of fetchStream(
|
||||
http,
|
||||
endpoint,
|
||||
apiVersion,
|
||||
abortCtrl,
|
||||
body,
|
||||
true,
|
||||
headers
|
||||
)) {
|
||||
if (fetchStreamError !== null) {
|
||||
actionBuffer.push(addError(fetchStreamError));
|
||||
} else if (action) {
|
||||
actionBuffer.push(action);
|
||||
}
|
||||
}
|
||||
|
||||
fetchState.isActive = false;
|
||||
flushBuffer(false);
|
||||
},
|
||||
{
|
||||
condition: (_, { getState }) => {
|
||||
// This is a bit of a hack to prevent instant restarts while the stream is running.
|
||||
// The problem is that in RTK v1, async thunks cannot be made part of the slice,
|
||||
// so they will not know the namespace used where they run in. We just assume
|
||||
// `stream` here as the namespace, if it's a custom one, this will not work.
|
||||
// RTK v2 will allow async thunks to be part of the slice, a draft PR to upgrade
|
||||
// is up there: https://github.com/elastic/kibana/pull/178986
|
||||
try {
|
||||
const s = getState() as { stream?: StreamState };
|
||||
|
||||
if (s.stream === undefined) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// If the stream was running, the extra reducers will also have set
|
||||
// and error, so we need to prevent the stream from starting again.
|
||||
if (s.stream.isRunning && s.stream.errors.length > 0) {
|
||||
return false;
|
||||
}
|
||||
} catch (e) {
|
||||
return true;
|
||||
}
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
export interface StreamState {
|
||||
errors: string[];
|
||||
isCancelled: boolean;
|
||||
isRunning: boolean;
|
||||
}
|
||||
|
||||
function getDefaultState(): StreamState {
|
||||
return {
|
||||
errors: [],
|
||||
isCancelled: false,
|
||||
isRunning: false,
|
||||
};
|
||||
}
|
||||
|
||||
export const streamSlice = createSlice({
|
||||
name: 'stream',
|
||||
initialState: getDefaultState(),
|
||||
reducers: {
|
||||
addError: (state: StreamState, action: PayloadAction<string>) => {
|
||||
state.errors.push(action.payload);
|
||||
},
|
||||
cancelStream: (state: StreamState) => {
|
||||
state.isCancelled = true;
|
||||
state.isRunning = false;
|
||||
},
|
||||
},
|
||||
extraReducers: (builder) => {
|
||||
builder.addCase(startStream.pending, (state) => {
|
||||
if (state.isRunning) {
|
||||
state.errors.push('Instant restart while running not supported yet.');
|
||||
return;
|
||||
}
|
||||
|
||||
state.errors = [];
|
||||
state.isCancelled = false;
|
||||
state.isRunning = true;
|
||||
});
|
||||
builder.addCase(startStream.fulfilled, (state) => {
|
||||
state.isRunning = false;
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
// Action creators are generated for each case reducer function
|
||||
export const { addError, cancelStream } = streamSlice.actions;
|
|
@ -19,8 +19,7 @@ import { isPopulatedObject } from '@kbn/ml-is-populated-object';
|
|||
|
||||
import { fetchStream } from './fetch_stream';
|
||||
import { stringReducer, type StringReducer } from './string_reducer';
|
||||
|
||||
const DATA_THROTTLE_MS = 100;
|
||||
import { DATA_THROTTLE_MS } from './constants';
|
||||
|
||||
// This pattern with a dual ternary allows us to default to StringReducer
|
||||
// and if a custom reducer is supplied fall back to that one instead.
|
||||
|
@ -80,7 +79,8 @@ export function useFetchStream<B extends object, R extends Reducer<any, any>>(
|
|||
// a lot of unnecessary re-renders even in combination with `useThrottle`.
|
||||
// We're now using `dataRef` to allow updates outside of the render cycle.
|
||||
// When the stream is running, we'll update `data` with the `dataRef` value
|
||||
// periodically.
|
||||
// periodically. This will get simpler with React 18 where we
|
||||
// can make use of `useDeferredValue`.
|
||||
const [data, setData] = useState(reducerWithFallback.initialState);
|
||||
const dataRef = useRef(reducerWithFallback.initialState);
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue