Update "Gen AI Streaming Response Example" to make use of @kbn/ml-response-stream. (#162609)

## Summary

Follow up to #161676.

Since we added support for handling SSE-streams returned by OpenAI APIs
to `@kbn/ml-response-stream` in #162335, this updates the "Gen AI
Streaming Response" developer example to make use of the custom hook
`useFetchStream()` from `@kbn/ml-response-stream` instead of its inline
code to process the stream on the client.

It also improves the refresh behaviour: Previously, once a prompt was
entered and the accordion opened, every keystroke in the prompt textarea
would cause a new request to the AI endpoint. This update adds a
`Refresh prompt` to only do a new request on this user action. Support
for cancelling requests when unmounting the `StreamingResponse`
component was also added.


![gen-ai-streaming-0001](af81d3ac-cc03-4550-9aa5-8685c15304cd)

### Checklist

- [x] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Walter Rafelsberger 2023-07-28 10:45:14 +02:00 committed by GitHub
parent d213ed274c
commit 45f7a0b2fc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 90 additions and 129 deletions

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import React, { useEffect, useMemo, useState } from 'react';
import React, { useEffect, useState } from 'react';
import {
EuiFlexGroup,
EuiFlexItem,
@ -20,22 +20,14 @@ import {
import { i18n } from '@kbn/i18n';
import { css } from '@emotion/react';
import { CoreStart } from '@kbn/core/public';
import { concatMap, delay, Observable, of } from 'rxjs';
import useObservable from 'react-use/lib/useObservable';
import { useFetchStream } from '@kbn/ml-response-stream/client';
export interface StreamingResponseProps {
http: CoreStart['http'];
prompt: string;
selectedConnectorId: string;
initialIsOpen?: boolean;
}
export interface PromptObservableState {
chunks: Chunk[];
message?: string;
error?: string;
loading: boolean;
}
interface ChunkChoice {
index: 0;
delta: { role: string; content: string };
@ -75,117 +67,42 @@ export const StreamingResponse = ({
http,
prompt,
selectedConnectorId,
initialIsOpen = false,
}: StreamingResponseProps) => {
const { euiTheme } = useEuiTheme();
const [hasOpened, setHasOpened] = useState(false);
const [hasOpened, setHasOpened] = useState(initialIsOpen);
const response$ = useMemo(() => {
return hasOpened
? new Observable<PromptObservableState>((observer) => {
observer.next({ chunks: [], loading: true });
const { errors, start, cancel, data, isRunning } = useFetchStream(
http,
`/internal/examples/execute_gen_ai_connector`,
undefined,
{
connector_id: selectedConnectorId,
prompt,
},
{ reducer: streamReducer, initialState: '' }
);
http
.post(`/internal/examples/execute_gen_ai_connector`, {
body: JSON.stringify({
connector_id: selectedConnectorId,
prompt,
}),
asResponse: true,
rawResponse: true,
})
.then((response) => {
const status = response.response?.status;
// Start fetching when the accordion was opened
useEffect(() => {
if (hasOpened && !isRunning && errors.length === 0 && data === '') {
start();
}
}, [data, errors, hasOpened, isRunning, start]);
if (!status || status >= 400) {
throw new Error(response.response?.statusText || 'Unexpected error');
}
// Cancel fetching when the component unmounts
// eslint-disable-next-line react-hooks/exhaustive-deps
useEffect(() => cancel, []);
const reader = response.response.body?.getReader();
if (!reader) {
throw new Error('Could not get reader from response');
}
const decoder = new TextDecoder();
const chunks: Chunk[] = [];
let prev: string = '';
function read() {
reader!.read().then(({ done, value }: { done: boolean; value?: Uint8Array }) => {
try {
if (done) {
observer.next({
chunks,
message: getMessageFromChunks(chunks),
loading: false,
});
observer.complete();
return;
}
let lines: string[] = (prev + decoder.decode(value)).split('\n');
const lastLine: string = lines[lines.length - 1];
const isPartialChunk: boolean = !!lastLine && lastLine !== 'data: [DONE]';
if (isPartialChunk) {
prev = lastLine;
lines.pop();
} else {
prev = '';
}
lines = lines
.map((str) => str.substr(6))
.filter((str) => !!str && str !== '[DONE]');
const nextChunks: Chunk[] = lines.map((line) => JSON.parse(line));
nextChunks.forEach((chunk) => {
chunks.push(chunk);
observer.next({
chunks,
message: getMessageFromChunks(chunks),
loading: true,
});
});
} catch (err) {
observer.error(err);
return;
}
read();
});
}
read();
return () => {
reader.cancel();
};
})
.catch((err) => {
observer.next({ chunks: [], error: err.message, loading: false });
});
}).pipe(concatMap((value) => of(value).pipe(delay(50))))
: new Observable<PromptObservableState>(() => {});
}, [http, hasOpened, prompt, selectedConnectorId]);
const response = useObservable(response$);
useEffect(() => {}, [response$]);
let content = response?.message ?? '';
let content = data;
let state: 'init' | 'loading' | 'streaming' | 'error' | 'complete' = 'init';
if (response?.loading) {
if (isRunning) {
state = content ? 'streaming' : 'loading';
} else if (response && 'error' in response && response.error) {
} else if (errors.length > 0) {
state = 'error';
content = response.error;
content = errors.join('\n');
} else if (content) {
state = 'complete';
}
@ -252,7 +169,7 @@ export const StreamingResponse = ({
</EuiFlexItem>
</EuiFlexGroup>
}
initialIsOpen={false}
initialIsOpen={initialIsOpen}
onToggle={() => {
setHasOpened(true);
}}
@ -264,10 +181,9 @@ export const StreamingResponse = ({
);
};
function getMessageFromChunks(chunks: Chunk[]) {
let message = '';
chunks.forEach((chunk) => {
message += chunk.choices[0]?.delta.content ?? '';
});
return message;
function streamReducer(state: string, action: Chunk | Chunk[]) {
return `${state}${[action]
.flat()
.map((chunk) => chunk.choices[0]?.delta.content ?? '')
.join('')}`;
}

View file

@ -75,6 +75,22 @@ export const GenAiStreamingResponseExampleApp = ({
const [selectedConnectorId, setSelectedConnectorId] = useState<string>('');
const [prompt, setPrompt] = useState<string>();
// The refresh behavior is implemented like this:
//
// - On refresh the current prompt gets stored in `toBeRefreshedPrompt` and
// and then the actual prompt gets cleared. React will rerender and unmount
// the `StreamingResponse` component because of the empty prompt.
// - A `useEffect` then checks if `toBeRefreshedPrompt` is populated
// and will apply it to `prompt` again.
// - In combination with `initialIsOpen=true` this will cause the
// `StreamingResponse` component to start fetching a new response
// immediately after remounting.
//
// This pattern requires a bit more logic in this component but it avoids
// tight coupling via callbacks of this component and `StreamingResponse`.
const [toBeRefreshedPrompt, setToBeRefreshedPrompt] = useState<string>();
const [initialIsOpen, setInitialIsOpen] = useState(false);
const getConnectors = useCallback(async () => {
const result = await loadGenAiConnectors({ http });
setConnectors(result);
@ -103,7 +119,23 @@ export const GenAiStreamingResponseExampleApp = ({
[setPrompt]
);
const clearPrompt = useCallback(() => setPrompt(''), [setPrompt]);
const clearPromptHandler = () => {
setInitialIsOpen(false);
setPrompt('');
};
const refreshPromptHandler = () => {
setInitialIsOpen(true);
setToBeRefreshedPrompt(prompt);
setPrompt('');
};
useEffect(() => {
if (prompt === '' && toBeRefreshedPrompt && toBeRefreshedPrompt !== '') {
setPrompt(toBeRefreshedPrompt);
setToBeRefreshedPrompt('');
}
}, [prompt, toBeRefreshedPrompt]);
return (
<EuiPage>
@ -150,16 +182,27 @@ export const GenAiStreamingResponseExampleApp = ({
defaultMessage: 'Enter a prompt',
})}
labelAppend={
<EuiText size="xs">
<EuiLink onClick={clearPrompt}>
{i18n.translate(
'genAiStreamingResponseExample.app.component.userPromptLabelAppend',
{
defaultMessage: 'Clear prompt',
}
)}
</EuiLink>
</EuiText>
<>
<EuiText size="xs">
<EuiLink onClick={clearPromptHandler}>
{i18n.translate(
'genAiStreamingResponseExample.app.component.userPromptLabelAppendClearPrompt',
{
defaultMessage: 'Clear prompt',
}
)}
</EuiLink>{' '}
|{' '}
<EuiLink onClick={refreshPromptHandler}>
{i18n.translate(
'genAiStreamingResponseExample.app.component.userPromptLabelAppendRefreshPrompt',
{
defaultMessage: 'Refresh prompt',
}
)}
</EuiLink>
</EuiText>
</>
}
>
<EuiTextArea
@ -182,6 +225,7 @@ export const GenAiStreamingResponseExampleApp = ({
http={http}
prompt={prompt}
selectedConnectorId={selectedConnectorId}
initialIsOpen={initialIsOpen}
/>
)}
</EuiFlexItem>

View file

@ -25,5 +25,6 @@
"@kbn/charts-plugin",
"@kbn/data-plugin",
"@kbn/actions-plugin",
"@kbn/ml-response-stream",
]
}