[Response Ops][Actions] Allow streaming responses from Generative AI connector (#161676)

Resolves https://github.com/elastic/kibana/issues/159598

## Summary

This PR modifies the `test` subaction of the Generative AI connector to
accept a `stream` parameter (default: `false`) that allows for a
streaming response.

The Generative AI connector is basically a pass-through to the Open
AI/Azure OpenAI APIs, where the `stream` parameter is passed in via the
body of the request. This means that with the existing connector, users
could specify `stream: true` in the body which would lead to unexpected
results when the action is unprepared to return streaming results. This
PR sanitizes the body that is passed in the `run` subaction to prevent
the `stream` parameter from being set to `true` and explicitly sets the
`stream` parameter for the `test` subaction.

In order to test the streaming response, I created an example plugin
that prompts users to create a Generative AI connector if one does not
exist and then executes actions using the connector with `stream` set to
`true`. This borrows liberally from @dgieselaar's existing work from
https://github.com/elastic/kibana/pull/158678


441694cb-0154-4450-bd93-3907c4a9995c



## To Verify

1. Navigate to https://localhost:5601/app/GenAiStreamingResponseExample
2. Set up a Generative AI connector
3. Open the network console. Enter a prompt and click `Stream Response`
4. You should see the chat response return streaming results.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2023-07-14 19:17:12 -04:00 committed by GitHub
parent 818efd2554
commit 8a56a2bbaa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 1957 additions and 56 deletions

1
.github/CODEOWNERS vendored
View file

@ -400,6 +400,7 @@ src/plugins/ftr_apis @elastic/kibana-core
packages/kbn-ftr-common-functional-services @elastic/kibana-operations @elastic/appex-qa
packages/kbn-ftr-screenshot-filename @elastic/kibana-operations @elastic/appex-qa
x-pack/test/functional_with_es_ssl/plugins/cases @elastic/response-ops
x-pack/examples/gen_ai_streaming_response_example @elastic/response-ops
packages/kbn-generate @elastic/kibana-operations
packages/kbn-generate-console-definitions @elastic/platform-deployment-management
packages/kbn-generate-csv @elastic/appex-sharedux

View file

@ -429,6 +429,7 @@
"@kbn/foo-plugin": "link:x-pack/test/ui_capabilities/common/plugins/foo_plugin",
"@kbn/ftr-apis-plugin": "link:src/plugins/ftr_apis",
"@kbn/functional-with-es-ssl-cases-test-plugin": "link:x-pack/test/functional_with_es_ssl/plugins/cases",
"@kbn/gen-ai-streaming-response-example-plugin": "link:x-pack/examples/gen_ai_streaming_response_example",
"@kbn/generate-console-definitions": "link:packages/kbn-generate-console-definitions",
"@kbn/generate-csv": "link:packages/kbn-generate-csv",
"@kbn/generate-csv-types": "link:packages/kbn-generate-csv-types",

View file

@ -794,6 +794,8 @@
"@kbn/ftr-screenshot-filename/*": ["packages/kbn-ftr-screenshot-filename/*"],
"@kbn/functional-with-es-ssl-cases-test-plugin": ["x-pack/test/functional_with_es_ssl/plugins/cases"],
"@kbn/functional-with-es-ssl-cases-test-plugin/*": ["x-pack/test/functional_with_es_ssl/plugins/cases/*"],
"@kbn/gen-ai-streaming-response-example-plugin": ["x-pack/examples/gen_ai_streaming_response_example"],
"@kbn/gen-ai-streaming-response-example-plugin/*": ["x-pack/examples/gen_ai_streaming_response_example/*"],
"@kbn/generate": ["packages/kbn-generate"],
"@kbn/generate/*": ["packages/kbn-generate/*"],
"@kbn/generate-console-definitions": ["packages/kbn-generate-console-definitions"],

View file

@ -0,0 +1,5 @@
## Generative AI Connector Streaming Response Example
This example plugin shows you how to stream a response from a Generative AI connector.
To run this example, use the command `yarn start --run-examples`.

View file

@ -0,0 +1,18 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
module.exports = {
preset: '@kbn/test',
rootDir: '../../..',
roots: ['<rootDir>/x-pack/examples/gen_ai_streaming_response_example'],
coverageDirectory:
'<rootDir>/target/kibana-coverage/jest/x-pack/examples/gen_ai_streaming_response_example',
coverageReporters: ['text', 'html'],
collectCoverageFrom: [
'<rootDir>/x-pack/examples/gen_ai_streaming_response_example/{public,server}/**/*.{ts,tsx}',
],
};

View file

@ -0,0 +1,20 @@
{
"type": "plugin",
"id": "@kbn/gen-ai-streaming-response-example-plugin",
"owner": "@elastic/response-ops",
"plugin": {
"id": "genAiStreamingResponseExample",
"server": true,
"browser": true,
"requiredPlugins": [
"triggersActionsUi",
"actions",
"kibanaReact",
"developerExamples"
],
"requiredBundles": [
"kibanaReact",
"stackConnectors"
]
}
}

View file

@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React from 'react';
import ReactDOM from 'react-dom';
import { AppMountParameters, CoreStart } from '@kbn/core/public';
import { KibanaContextProvider } from '@kbn/kibana-react-plugin/public';
import { GenAiStreamingResponseExampleApp } from './gen_ai_streaming_response_example';
import { GenAiStreamingResponseExamplePublicStartDeps } from './plugin';
export const renderApp = (
core: CoreStart,
deps: GenAiStreamingResponseExamplePublicStartDeps,
{ element }: AppMountParameters
) => {
const { http } = core;
ReactDOM.render(
<KibanaContextProvider services={{ ...core, ...deps }}>
<GenAiStreamingResponseExampleApp http={http} triggersActionsUi={deps.triggersActionsUi} />
</KibanaContextProvider>,
element
);
return () => ReactDOM.unmountComponentAtNode(element);
};

View file

@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { useCallback, useMemo, useState } from 'react';
import { EuiFlexItem, EuiFormRow, EuiLink, EuiSuperSelect, EuiText } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { LoadConnectorResult } from '../gen_ai_streaming_response_example';
export interface ListConnectorsProps {
connectors: LoadConnectorResult[];
setIsConnectorModalVisible: React.Dispatch<React.SetStateAction<boolean>>;
onConnectorSelect: React.Dispatch<React.SetStateAction<string>>;
}
export const ListConnectors = ({
connectors,
onConnectorSelect,
setIsConnectorModalVisible,
}: ListConnectorsProps) => {
const [value, setValue] = useState<string>();
const connectorOptions = useMemo(() => {
return (
connectors.map((connector) => {
return {
value: connector.id,
inputDisplay: connector.name,
dropdownDisplay: (
<React.Fragment key={connector.id}>
<strong>{connector.name}</strong>
</React.Fragment>
),
};
}) ?? []
);
}, [connectors]);
const onSelectConnector = useCallback(
(v: string) => {
setValue(v);
onConnectorSelect(v);
},
[onConnectorSelect]
);
return (
<>
<EuiFlexItem>
<EuiFormRow
label={i18n.translate(
'genAiStreamingResponseExample.app.component.selectConnectorLabel',
{
defaultMessage: 'Select a Generative AI Connector',
}
)}
labelAppend={
<EuiText size="xs">
<EuiLink onClick={() => setIsConnectorModalVisible(true)}>
{i18n.translate(
'genAiStreamingResponseExample.app.component.selectConnectorLabelAppend',
{
defaultMessage: 'or add another',
}
)}
</EuiLink>
</EuiText>
}
>
<EuiSuperSelect
valueOfSelected={value}
options={connectorOptions}
onChange={onSelectConnector}
/>
</EuiFormRow>
</EuiFlexItem>
</>
);
};

View file

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React from 'react';
import { GenAiLogo } from '@kbn/stack-connectors-plugin/public/common';
import { EuiFlexGroup, EuiCard, EuiFlexItem, EuiIcon } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
export interface SetupConnectorProps {
setIsConnectorModalVisible: React.Dispatch<React.SetStateAction<boolean>>;
}
export const SetupConnector = ({ setIsConnectorModalVisible }: SetupConnectorProps) => {
return (
<EuiFlexGroup gutterSize="l">
<EuiFlexItem grow={false}>
<EuiCard
layout="horizontal"
icon={<EuiIcon size="xl" type={GenAiLogo} />}
title={i18n.translate(
'genAiStreamingResponseExample.app.component.addConnectorCardTitle',
{
defaultMessage: 'Add Generative AI Connector',
}
)}
description={i18n.translate(
'genAiStreamingResponseExample.app.component.addConnectorCardDescription',
{
defaultMessage: 'Configure a connector to continue',
}
)}
onClick={() => setIsConnectorModalVisible(true)}
/>
</EuiFlexItem>
</EuiFlexGroup>
);
};

View file

@ -0,0 +1,273 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { useEffect, useMemo, useState } from 'react';
import {
EuiFlexGroup,
EuiFlexItem,
EuiAccordion,
EuiPanel,
EuiSpacer,
EuiText,
useEuiTheme,
EuiLoadingSpinner,
EuiIcon,
} from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { css } from '@emotion/react';
import { CoreStart } from '@kbn/core/public';
import { concatMap, delay, Observable, of } from 'rxjs';
import useObservable from 'react-use/lib/useObservable';
export interface StreamingResponseProps {
http: CoreStart['http'];
prompt: string;
selectedConnectorId: string;
}
export interface PromptObservableState {
chunks: Chunk[];
message?: string;
error?: string;
loading: boolean;
}
interface ChunkChoice {
index: 0;
delta: { role: string; content: string };
finish_reason: null | string;
}
interface Chunk {
id: string;
object: string;
created: number;
model: string;
choices: ChunkChoice[];
}
const cursorCss = `
@keyframes blink {
0% {
opacity: 1;
}
50% {
opacity: 0;
}
100% {
opacity: 1;
}
}
animation: blink 1s infinite;
width: 10px;
height: 16px;
vertical-align: middle;
display: inline-block;
background: rgba(0, 0, 0, 0.25);
`;
export const StreamingResponse = ({
http,
prompt,
selectedConnectorId,
}: StreamingResponseProps) => {
const { euiTheme } = useEuiTheme();
const [hasOpened, setHasOpened] = useState(false);
const response$ = useMemo(() => {
return hasOpened
? new Observable<PromptObservableState>((observer) => {
observer.next({ chunks: [], loading: true });
http
.post(`/internal/examples/execute_gen_ai_connector`, {
body: JSON.stringify({
connector_id: selectedConnectorId,
prompt,
}),
asResponse: true,
rawResponse: true,
})
.then((response) => {
const status = response.response?.status;
if (!status || status >= 400) {
throw new Error(response.response?.statusText || 'Unexpected error');
}
const reader = response.response.body?.getReader();
if (!reader) {
throw new Error('Could not get reader from response');
}
const decoder = new TextDecoder();
const chunks: Chunk[] = [];
let prev: string = '';
function read() {
reader!.read().then(({ done, value }: { done: boolean; value?: Uint8Array }) => {
try {
if (done) {
observer.next({
chunks,
message: getMessageFromChunks(chunks),
loading: false,
});
observer.complete();
return;
}
let lines: string[] = (prev + decoder.decode(value)).split('\n');
const lastLine: string = lines[lines.length - 1];
const isPartialChunk: boolean = !!lastLine && lastLine !== 'data: [DONE]';
if (isPartialChunk) {
prev = lastLine;
lines.pop();
} else {
prev = '';
}
lines = lines
.map((str) => str.substr(6))
.filter((str) => !!str && str !== '[DONE]');
const nextChunks: Chunk[] = lines.map((line) => JSON.parse(line));
nextChunks.forEach((chunk) => {
chunks.push(chunk);
observer.next({
chunks,
message: getMessageFromChunks(chunks),
loading: true,
});
});
} catch (err) {
observer.error(err);
return;
}
read();
});
}
read();
return () => {
reader.cancel();
};
})
.catch((err) => {
observer.next({ chunks: [], error: err.message, loading: false });
});
}).pipe(concatMap((value) => of(value).pipe(delay(50))))
: new Observable<PromptObservableState>(() => {});
}, [http, hasOpened, prompt, selectedConnectorId]);
const response = useObservable(response$);
useEffect(() => {}, [response$]);
let content = response?.message ?? '';
let state: 'init' | 'loading' | 'streaming' | 'error' | 'complete' = 'init';
if (response?.loading) {
state = content ? 'streaming' : 'loading';
} else if (response && 'error' in response && response.error) {
state = 'error';
content = response.error;
} else if (content) {
state = 'complete';
}
let inner: React.ReactElement;
if (state === 'complete' || state === 'streaming') {
inner = (
<p style={{ whiteSpace: 'pre-wrap', lineHeight: 1.5 }}>
{content}
{state === 'streaming' ? <span className={cursorCss} /> : <></>}
</p>
);
} else if (state === 'init' || state === 'loading') {
inner = (
<EuiFlexGroup direction="row" gutterSize="s" alignItems="center">
<EuiFlexItem grow={false}>
<EuiLoadingSpinner size="s" />
</EuiFlexItem>
<EuiFlexItem grow={false}>
<EuiText size="s">
{i18n.translate('xpack.observability.coPilotPrompt.chatLoading', {
defaultMessage: 'Waiting for a response...',
})}
</EuiText>
</EuiFlexItem>
</EuiFlexGroup>
);
} else {
inner = (
<EuiFlexGroup direction="row" gutterSize="s" alignItems="center">
<EuiFlexItem grow={false}>
<EuiIcon color="danger" type="warning" />
</EuiFlexItem>
<EuiFlexItem grow={false}>
<EuiText size="s">{content}</EuiText>
</EuiFlexItem>
</EuiFlexGroup>
);
}
return (
<EuiPanel color="primary">
<EuiAccordion
id={`streamingResponse`}
css={css`
.euiButtonIcon {
color: ${euiTheme.colors.primaryText};
}
`}
buttonContent={
<EuiFlexGroup direction="row" alignItems="center">
<EuiFlexItem grow>
<EuiText size="m" color={euiTheme.colors.primaryText}>
<strong>
{i18n.translate(
'genAiStreamingResponseExample.app.component.streamingResponseTitle',
{
defaultMessage: 'Stream Response',
}
)}
</strong>
</EuiText>
</EuiFlexItem>
</EuiFlexGroup>
}
initialIsOpen={false}
onToggle={() => {
setHasOpened(true);
}}
>
<EuiSpacer size="s" />
{inner}
</EuiAccordion>
</EuiPanel>
);
};
function getMessageFromChunks(chunks: Chunk[]) {
let message = '';
chunks.forEach((chunk) => {
message += chunk.choices[0]?.delta.content ?? '';
});
return message;
}

View file

@ -0,0 +1,196 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { useCallback, useEffect, useState } from 'react';
import {
EuiPageBody,
EuiPageHeader,
EuiSpacer,
EuiPageSection,
EuiPage,
EuiFlexItem,
EuiFormRow,
EuiTextArea,
EuiText,
EuiLink,
} from '@elastic/eui';
import {
ActionType,
TriggersAndActionsUIPublicPluginStart,
} from '@kbn/triggers-actions-ui-plugin/public';
import { CoreStart, HttpSetup } from '@kbn/core/public';
import {
ConnectorAddModal,
loadActionTypes,
} from '@kbn/triggers-actions-ui-plugin/public/common/constants';
import { i18n } from '@kbn/i18n';
import { SetupConnector } from './components/setup_connector';
import { ListConnectors } from './components/list_connector';
import { StreamingResponse } from './components/streaming_response';
const width = 800;
export interface GenAiStreamingResponseExampleAppParams {
http: CoreStart['http'];
triggersActionsUi: TriggersAndActionsUIPublicPluginStart;
}
export interface LoadConnectorResult {
id: string;
actionTypeId: string;
name: string;
}
const loadGenAiConnectors = async ({
http,
}: {
http: HttpSetup;
}): Promise<LoadConnectorResult[]> => {
return await http.get(`/internal/examples/get_gen_ai_connectors`);
};
export const GenAiStreamingResponseExampleApp = ({
http,
triggersActionsUi,
}: GenAiStreamingResponseExampleAppParams) => {
const { actionTypeRegistry } = triggersActionsUi;
const [genAiConnectorType, setGenAiConnectorType] = useState<ActionType>({
enabledInConfig: true,
enabledInLicense: true,
isSystemActionType: false,
minimumLicenseRequired: 'platinum',
supportedFeatureIds: ['general'],
id: '.gen-ai',
name: 'Generative AI',
enabled: true,
});
const [loading, setLoading] = useState<boolean>(true);
const [connectors, setConnectors] = useState<LoadConnectorResult[]>([]);
const [hasGenAiConnectors, setHasGenAiConnectors] = useState<boolean>(false);
const [isConnectorModalVisible, setIsConnectorModalVisible] = useState<boolean>(false);
const [selectedConnectorId, setSelectedConnectorId] = useState<string>('');
const [prompt, setPrompt] = useState<string>();
const getConnectors = useCallback(async () => {
const result = await loadGenAiConnectors({ http });
setConnectors(result);
setHasGenAiConnectors(result.length > 0);
}, [http, setConnectors, setHasGenAiConnectors]);
useEffect(() => {
(async function () {
const result = await loadActionTypes({ http });
const ga = result?.find((at) => at.id === '.gen-ai');
if (ga) {
setGenAiConnectorType(ga);
}
})();
}, [http, setConnectors]);
useEffect(() => {
(async function () {
await getConnectors();
setLoading(false);
})();
}, [getConnectors]);
const onPromptChange = useCallback(
(e: React.ChangeEvent<HTMLTextAreaElement>) => setPrompt(e.target.value),
[setPrompt]
);
const clearPrompt = useCallback(() => setPrompt(''), [setPrompt]);
return (
<EuiPage>
<EuiPageBody>
<EuiSpacer size="xl" />
<EuiPageHeader
paddingSize="l"
restrictWidth={width}
bottomBorder={`extended`}
pageTitle={i18n.translate('genAiStreamingResponseExample.app.pageTitle', {
defaultMessage: 'Gen AI Streaming Response Example',
})}
/>
<EuiPageSection restrictWidth={width} color={`plain`} grow={true}>
{!loading && (
<>
{hasGenAiConnectors ? (
<ListConnectors
connectors={connectors}
onConnectorSelect={setSelectedConnectorId}
setIsConnectorModalVisible={setIsConnectorModalVisible}
/>
) : (
<SetupConnector setIsConnectorModalVisible={setIsConnectorModalVisible} />
)}
{isConnectorModalVisible && (
<ConnectorAddModal
actionType={genAiConnectorType}
onClose={async () => {
// refetch the connectors
await getConnectors();
setIsConnectorModalVisible(false);
}}
actionTypeRegistry={actionTypeRegistry}
/>
)}
{selectedConnectorId.length > 0 && (
<>
<EuiSpacer size="xl" />
<EuiFlexItem>
<EuiFormRow
fullWidth
label={i18n.translate('genAiStreamingResponseExample.app.userPromptLabel', {
defaultMessage: 'Enter a prompt',
})}
labelAppend={
<EuiText size="xs">
<EuiLink onClick={clearPrompt}>
{i18n.translate(
'genAiStreamingResponseExample.app.component.userPromptLabelAppend',
{
defaultMessage: 'Clear prompt',
}
)}
</EuiLink>
</EuiText>
}
>
<EuiTextArea
placeholder={i18n.translate(
'genAiStreamingResponseExample.app.component.textPlaceholder',
{
defaultMessage: 'Ask a question and get a streaming response',
}
)}
value={prompt}
fullWidth
onChange={onPromptChange}
/>
</EuiFormRow>
</EuiFlexItem>
<EuiSpacer size={'m'} />
<EuiFlexItem>
{prompt && selectedConnectorId.length > 0 && (
<StreamingResponse
http={http}
prompt={prompt}
selectedConnectorId={selectedConnectorId}
/>
)}
</EuiFlexItem>
</>
)}
</>
)}
</EuiPageSection>
</EuiPageBody>
</EuiPage>
);
};

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { GenAiStreamingResponseExamplePlugin } from './plugin';
export const plugin = () => new GenAiStreamingResponseExamplePlugin();

View file

@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Plugin, CoreSetup, AppMountParameters, AppNavLinkStatus } from '@kbn/core/public';
import { PluginSetupContract as AlertingSetup } from '@kbn/alerting-plugin/public';
import { ChartsPluginStart } from '@kbn/charts-plugin/public';
import {
TriggersAndActionsUIPublicPluginSetup,
TriggersAndActionsUIPublicPluginStart,
} from '@kbn/triggers-actions-ui-plugin/public';
import { DataPublicPluginStart } from '@kbn/data-plugin/public';
import { DeveloperExamplesSetup } from '@kbn/developer-examples-plugin/public';
export interface GenAiStreamingResponseExamplePublicSetupDeps {
alerting: AlertingSetup;
triggersActionsUi: TriggersAndActionsUIPublicPluginSetup;
developerExamples: DeveloperExamplesSetup;
}
export interface GenAiStreamingResponseExamplePublicStartDeps {
alerting: AlertingSetup;
triggersActionsUi: TriggersAndActionsUIPublicPluginStart;
charts: ChartsPluginStart;
data: DataPublicPluginStart;
}
export class GenAiStreamingResponseExamplePlugin
implements Plugin<void, void, GenAiStreamingResponseExamplePublicSetupDeps>
{
public setup(
core: CoreSetup<GenAiStreamingResponseExamplePublicStartDeps, void>,
{ developerExamples }: GenAiStreamingResponseExamplePublicSetupDeps
) {
core.application.register({
id: 'GenAiStreamingResponseExample',
title: 'Generative AI Streaming Response Example',
navLinkStatus: AppNavLinkStatus.hidden,
async mount(params: AppMountParameters) {
const [coreStart, depsStart] = await core.getStartServices();
const { renderApp } = await import('./application');
return renderApp(coreStart, depsStart, params);
},
});
developerExamples.register({
appId: 'GenAiStreamingResponseExample',
title: 'Generative AI Streaming Response Example',
description:
'This example demonstrates how to stream a response from a Generative AI connector',
});
}
public start() {}
public stop() {}
}

View file

@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { PluginInitializer } from '@kbn/core/server';
import { GenAiStreamingResponseExamplePlugin } from './plugin';
export const plugin: PluginInitializer<void, void> = () =>
new GenAiStreamingResponseExamplePlugin();

View file

@ -0,0 +1,123 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import Boom from '@hapi/boom';
import { CreateChatCompletionResponse } from 'openai';
import { Readable } from 'stream';
import { Plugin, CoreSetup } from '@kbn/core/server';
import { schema } from '@kbn/config-schema';
import { PluginStartContract as ActionsPluginStart } from '@kbn/actions-plugin/server';
interface GenAiStreamingResponseExamplePluginStart {
actions: ActionsPluginStart;
}
interface Message {
role: string;
content: string;
}
interface MessageBody {
model?: string;
messages: Message[];
}
export class GenAiStreamingResponseExamplePlugin implements Plugin<void, void> {
public setup({ http, getStartServices }: CoreSetup<GenAiStreamingResponseExamplePluginStart>) {
const router = http.createRouter();
router.get(
{
path: `/internal/examples/get_gen_ai_connectors`,
validate: {},
},
async (_, request, response) => {
const [, { actions }] = await getStartServices();
const actionsClient = await actions.getActionsClientWithRequest(request);
const allConnectors = await actionsClient.getAll();
return response.ok({
body: (allConnectors ?? []).filter(
(connector) => connector.actionTypeId === '.gen-ai' && !connector.isPreconfigured
),
});
}
);
router.post(
{
path: `/internal/examples/execute_gen_ai_connector`,
validate: {
body: schema.object({
connector_id: schema.string(),
prompt: schema.string(),
}),
},
},
async (_, request, response) => {
const [, { actions }] = await getStartServices();
const actionsClient = await actions.getActionsClientWithRequest(request);
const connector = await actionsClient.get({ id: request.body.connector_id });
let messageBody: MessageBody;
if (connector.config?.apiProvider === 'OpenAI') {
messageBody = {
model: 'gpt-3.5-turbo',
messages: [
{
role: 'user',
content: request.body.prompt,
},
],
};
} else if (connector.config?.apiProvider === 'Azure OpenAI') {
messageBody = {
messages: [
{
role: 'user',
content: request.body.prompt,
},
],
};
} else {
throw Boom.badRequest(
`Invalid generative AI connector selected - ${connector.config?.apiProvider} is not a valid provider`
);
}
const executeResult = await actionsClient.execute({
actionId: request.body.connector_id,
params: {
subAction: 'stream',
subActionParams: {
body: JSON.stringify(messageBody),
stream: true,
},
},
});
if (executeResult?.status === 'error') {
return response.customError({
statusCode: 500,
body: {
message: `${executeResult?.message} - ${executeResult?.serviceMessage}`,
},
});
}
return response.ok({
body: executeResult.data as CreateChatCompletionResponse | Readable,
});
}
);
}
public start() {}
public stop() {}
}

View file

@ -0,0 +1,29 @@
{
"extends": "../../../tsconfig.base.json",
"compilerOptions": {
"outDir": "target/types"
},
"include": [
"index.ts",
"public/**/*.ts",
"public/**/*.tsx",
"server/**/*.ts",
"../../../typings/**/*",
],
"exclude": [
"target/**/*",
],
"kbn_references": [
"@kbn/core",
"@kbn/kibana-react-plugin",
"@kbn/triggers-actions-ui-plugin",
"@kbn/developer-examples-plugin",
"@kbn/i18n",
"@kbn/config-schema",
"@kbn/stack-connectors-plugin",
"@kbn/alerting-plugin",
"@kbn/charts-plugin",
"@kbn/data-plugin",
"@kbn/actions-plugin",
]
}

View file

@ -16,10 +16,21 @@ export const GEN_AI_TITLE = i18n.translate(
export const GEN_AI_CONNECTOR_ID = '.gen-ai';
export enum SUB_ACTION {
RUN = 'run',
STREAM = 'stream',
DASHBOARD = 'getDashboard',
TEST = 'test',
}
export enum OpenAiProviderType {
OpenAi = 'OpenAI',
AzureAi = 'Azure OpenAI',
}
export const OPENAI_CHAT_URL = 'https://api.openai.com/v1/chat/completions' as const;
export const OPENAI_LEGACY_COMPLETION_URL = 'https://api.openai.com/v1/completions' as const;
export const AZURE_OPENAI_CHAT_URL =
'https://{your-resource-name}.openai.azure.com/openai/deployments/{deployment-id}/chat/completions?api-version={api-version}' as const;
export const AZURE_OPENAI_COMPLETIONS_URL =
'https://{your-resource-name}.openai.azure.com/openai/deployments/{deployment-id}/completions?api-version={api-version}' as const;
export const AZURE_OPENAI_COMPLETIONS_EXTENSIONS_URL =
'https://{your-resource-name}.openai.azure.com/openai/deployments/{deployment-id}/extensions/chat/completions?api-version={api-version}' as const;

View file

@ -6,10 +6,14 @@
*/
import { schema } from '@kbn/config-schema';
import { OpenAiProviderType } from './constants';
// Connector schema
export const GenAiConfigSchema = schema.object({
apiProvider: schema.string(),
apiProvider: schema.oneOf([
schema.literal(OpenAiProviderType.OpenAi as string),
schema.literal(OpenAiProviderType.AzureAi as string),
]),
apiUrl: schema.string(),
});
@ -20,6 +24,13 @@ export const GenAiRunActionParamsSchema = schema.object({
body: schema.string(),
});
// Execute action schema
export const GenAiStreamActionParamsSchema = schema.object({
body: schema.string(),
stream: schema.boolean({ defaultValue: false }),
});
export const GenAiStreamingResponseSchema = schema.any();
export const GenAiRunActionResponseSchema = schema.object(
{
id: schema.string(),

View file

@ -13,6 +13,7 @@ import {
GenAiRunActionResponseSchema,
GenAiDashboardActionParamsSchema,
GenAiDashboardActionResponseSchema,
GenAiStreamActionParamsSchema,
} from './schema';
export type GenAiConfig = TypeOf<typeof GenAiConfigSchema>;
@ -21,3 +22,4 @@ export type GenAiRunActionParams = TypeOf<typeof GenAiRunActionParamsSchema>;
export type GenAiRunActionResponse = TypeOf<typeof GenAiRunActionResponseSchema>;
export type GenAiDashboardActionParams = TypeOf<typeof GenAiDashboardActionParamsSchema>;
export type GenAiDashboardActionResponse = TypeOf<typeof GenAiDashboardActionResponseSchema>;
export type GenAiStreamActionParams = TypeOf<typeof GenAiStreamActionParamsSchema>;

View file

@ -10,61 +10,202 @@ import { actionsConfigMock } from '@kbn/actions-plugin/server/actions_config.moc
import { GEN_AI_CONNECTOR_ID, OpenAiProviderType } from '../../../common/gen_ai/constants';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import { actionsMock } from '@kbn/actions-plugin/server/mocks';
import { GenAiRunActionResponseSchema } from '../../../common/gen_ai/schema';
import {
GenAiRunActionResponseSchema,
GenAiStreamingResponseSchema,
} from '../../../common/gen_ai/schema';
import { initGenAiDashboard } from './create_dashboard';
jest.mock('./create_dashboard');
describe('GenAiConnector', () => {
const sampleBody = JSON.stringify({
model: 'gpt-3.5-turbo',
messages: [
{
role: 'user',
content: 'Hello world',
},
],
});
const mockResponse = { data: { result: 'success' } };
const mockRequest = jest.fn().mockResolvedValue(mockResponse);
const mockError = jest.fn().mockImplementation(() => {
throw new Error('API Error');
let mockRequest: jest.Mock;
let mockError: jest.Mock;
beforeEach(() => {
const mockResponse = { headers: {}, data: { result: 'success' } };
mockRequest = jest.fn().mockResolvedValue(mockResponse);
mockError = jest.fn().mockImplementation(() => {
throw new Error('API Error');
});
});
describe('OpenAI', () => {
const connector = new GenAiConnector({
configurationUtilities: actionsConfigMock.create(),
connector: { id: '1', type: GEN_AI_CONNECTOR_ID },
config: { apiUrl: 'https://example.com/api', apiProvider: OpenAiProviderType.OpenAi },
config: {
apiUrl: 'https://api.openai.com/v1/chat/completions',
apiProvider: OpenAiProviderType.OpenAi,
},
secrets: { apiKey: '123' },
logger: loggingSystemMock.createLogger(),
services: actionsMock.createServices(),
});
const sampleOpenAiBody = {
model: 'gpt-3.5-turbo',
messages: [
{
role: 'user',
content: 'Hello world',
},
],
};
beforeEach(() => {
// @ts-ignore
connector.request = mockRequest;
jest.clearAllMocks();
});
it('the OpenAI API call is successful with correct parameters', async () => {
const response = await connector.runApi({ body: sampleBody });
expect(mockRequest).toBeCalledTimes(1);
expect(mockRequest).toHaveBeenCalledWith({
url: 'https://example.com/api',
method: 'post',
responseSchema: GenAiRunActionResponseSchema,
data: sampleBody,
headers: {
Authorization: 'Bearer 123',
'content-type': 'application/json',
},
describe('runApi', () => {
it('the OpenAI API call is successful with correct parameters', async () => {
const response = await connector.runApi({ body: JSON.stringify(sampleOpenAiBody) });
expect(mockRequest).toBeCalledTimes(1);
expect(mockRequest).toHaveBeenCalledWith({
url: 'https://api.openai.com/v1/chat/completions',
method: 'post',
responseSchema: GenAiRunActionResponseSchema,
data: JSON.stringify({ ...sampleOpenAiBody, stream: false }),
headers: {
Authorization: 'Bearer 123',
'content-type': 'application/json',
},
});
expect(response).toEqual({ result: 'success' });
});
it('overrides stream parameter if set in the body', async () => {
const body = {
model: 'gpt-3.5-turbo',
messages: [
{
role: 'user',
content: 'Hello world',
},
],
};
const response = await connector.runApi({
body: JSON.stringify({
...body,
stream: true,
}),
});
expect(mockRequest).toBeCalledTimes(1);
expect(mockRequest).toHaveBeenCalledWith({
url: 'https://api.openai.com/v1/chat/completions',
method: 'post',
responseSchema: GenAiRunActionResponseSchema,
data: JSON.stringify({
...body,
stream: false,
}),
headers: {
Authorization: 'Bearer 123',
'content-type': 'application/json',
},
});
expect(response).toEqual({ result: 'success' });
});
it('errors during API calls are properly handled', async () => {
// @ts-ignore
connector.request = mockError;
await expect(connector.runApi({ body: JSON.stringify(sampleOpenAiBody) })).rejects.toThrow(
'API Error'
);
});
expect(response).toEqual({ result: 'success' });
});
it('errors during API calls are properly handled', async () => {
// @ts-ignore
connector.request = mockError;
describe('streamApi', () => {
it('the OpenAI API call is successful with correct parameters when stream = false', async () => {
const response = await connector.streamApi({
body: JSON.stringify(sampleOpenAiBody),
stream: false,
});
expect(mockRequest).toBeCalledTimes(1);
expect(mockRequest).toHaveBeenCalledWith({
url: 'https://api.openai.com/v1/chat/completions',
method: 'post',
responseSchema: GenAiRunActionResponseSchema,
data: JSON.stringify({ ...sampleOpenAiBody, stream: false }),
headers: {
Authorization: 'Bearer 123',
'content-type': 'application/json',
},
});
expect(response).toEqual({ result: 'success' });
});
await expect(connector.runApi({ body: sampleBody })).rejects.toThrow('API Error');
it('the OpenAI API call is successful with correct parameters when stream = true', async () => {
const response = await connector.streamApi({
body: JSON.stringify(sampleOpenAiBody),
stream: true,
});
expect(mockRequest).toBeCalledTimes(1);
expect(mockRequest).toHaveBeenCalledWith({
responseType: 'stream',
url: 'https://api.openai.com/v1/chat/completions',
method: 'post',
responseSchema: GenAiStreamingResponseSchema,
data: JSON.stringify({ ...sampleOpenAiBody, stream: true }),
headers: {
Authorization: 'Bearer 123',
'content-type': 'application/json',
},
});
expect(response).toEqual({
headers: { 'Content-Type': 'dont-compress-this' },
result: 'success',
});
});
it('overrides stream parameter if set in the body with explicit stream parameter', async () => {
const body = {
model: 'gpt-3.5-turbo',
messages: [
{
role: 'user',
content: 'Hello world',
},
],
};
const response = await connector.streamApi({
body: JSON.stringify({
...body,
stream: false,
}),
stream: true,
});
expect(mockRequest).toBeCalledTimes(1);
expect(mockRequest).toHaveBeenCalledWith({
responseType: 'stream',
url: 'https://api.openai.com/v1/chat/completions',
method: 'post',
responseSchema: GenAiStreamingResponseSchema,
data: JSON.stringify({
...body,
stream: true,
}),
headers: {
Authorization: 'Bearer 123',
'content-type': 'application/json',
},
});
expect(response).toEqual({
headers: { 'Content-Type': 'dont-compress-this' },
result: 'success',
});
});
it('errors during API calls are properly handled', async () => {
// @ts-ignore
connector.request = mockError;
await expect(
connector.streamApi({ body: JSON.stringify(sampleOpenAiBody), stream: true })
).rejects.toThrow('API Error');
});
});
});
@ -72,30 +213,169 @@ describe('GenAiConnector', () => {
const connector = new GenAiConnector({
configurationUtilities: actionsConfigMock.create(),
connector: { id: '1', type: GEN_AI_CONNECTOR_ID },
config: { apiUrl: 'https://example.com/api', apiProvider: OpenAiProviderType.AzureAi },
config: {
apiUrl:
'https://My-test-resource-123.openai.azure.com/openai/deployments/NEW-DEPLOYMENT-321/chat/completions?api-version=2023-05-15',
apiProvider: OpenAiProviderType.AzureAi,
},
secrets: { apiKey: '123' },
logger: loggingSystemMock.createLogger(),
services: actionsMock.createServices(),
});
const sampleAzureAiBody = {
messages: [
{
role: 'user',
content: 'Hello world',
},
],
};
beforeEach(() => {
// @ts-ignore
connector.request = mockRequest;
jest.clearAllMocks();
});
it('the AzureAI API call is successful with correct parameters', async () => {
const response = await connector.runApi({ body: sampleBody });
expect(mockRequest).toBeCalledTimes(1);
expect(mockRequest).toHaveBeenCalledWith({
url: 'https://example.com/api',
method: 'post',
responseSchema: GenAiRunActionResponseSchema,
data: sampleBody,
headers: {
'api-key': '123',
'content-type': 'application/json',
},
describe('runApi', () => {
it('test the AzureAI API call is successful with correct parameters', async () => {
const response = await connector.runApi({ body: JSON.stringify(sampleAzureAiBody) });
expect(mockRequest).toBeCalledTimes(1);
expect(mockRequest).toHaveBeenCalledWith({
url: 'https://My-test-resource-123.openai.azure.com/openai/deployments/NEW-DEPLOYMENT-321/chat/completions?api-version=2023-05-15',
method: 'post',
responseSchema: GenAiRunActionResponseSchema,
data: JSON.stringify({ ...sampleAzureAiBody, stream: false }),
headers: {
'api-key': '123',
'content-type': 'application/json',
},
});
expect(response).toEqual({ result: 'success' });
});
it('overrides stream parameter if set in the body', async () => {
const body = {
messages: [
{
role: 'user',
content: 'Hello world',
},
],
};
const response = await connector.runApi({
body: JSON.stringify({ ...body, stream: true }),
});
expect(mockRequest).toBeCalledTimes(1);
expect(mockRequest).toHaveBeenCalledWith({
url: 'https://My-test-resource-123.openai.azure.com/openai/deployments/NEW-DEPLOYMENT-321/chat/completions?api-version=2023-05-15',
method: 'post',
responseSchema: GenAiRunActionResponseSchema,
data: JSON.stringify({ ...sampleAzureAiBody, stream: false }),
headers: {
'api-key': '123',
'content-type': 'application/json',
},
});
expect(response).toEqual({ result: 'success' });
});
it('errors during API calls are properly handled', async () => {
// @ts-ignore
connector.request = mockError;
await expect(connector.runApi({ body: JSON.stringify(sampleAzureAiBody) })).rejects.toThrow(
'API Error'
);
});
});
describe('streamApi', () => {
it('the AzureAI API call is successful with correct parameters when stream = false', async () => {
const response = await connector.streamApi({
body: JSON.stringify(sampleAzureAiBody),
stream: false,
});
expect(mockRequest).toBeCalledTimes(1);
expect(mockRequest).toHaveBeenCalledWith({
url: 'https://My-test-resource-123.openai.azure.com/openai/deployments/NEW-DEPLOYMENT-321/chat/completions?api-version=2023-05-15',
method: 'post',
responseSchema: GenAiRunActionResponseSchema,
data: JSON.stringify({ ...sampleAzureAiBody, stream: false }),
headers: {
'api-key': '123',
'content-type': 'application/json',
},
});
expect(response).toEqual({ result: 'success' });
});
it('the AzureAI API call is successful with correct parameters when stream = true', async () => {
const response = await connector.streamApi({
body: JSON.stringify(sampleAzureAiBody),
stream: true,
});
expect(mockRequest).toBeCalledTimes(1);
expect(mockRequest).toHaveBeenCalledWith({
responseType: 'stream',
url: 'https://My-test-resource-123.openai.azure.com/openai/deployments/NEW-DEPLOYMENT-321/chat/completions?api-version=2023-05-15',
method: 'post',
responseSchema: GenAiStreamingResponseSchema,
data: JSON.stringify({ ...sampleAzureAiBody, stream: true }),
headers: {
'api-key': '123',
'content-type': 'application/json',
},
});
expect(response).toEqual({
headers: { 'Content-Type': 'dont-compress-this' },
result: 'success',
});
});
it('overrides stream parameter if set in the body with explicit stream parameter', async () => {
const body = {
messages: [
{
role: 'user',
content: 'Hello world',
},
],
};
const response = await connector.streamApi({
body: JSON.stringify({ ...body, stream: false }),
stream: true,
});
expect(mockRequest).toBeCalledTimes(1);
expect(mockRequest).toHaveBeenCalledWith({
responseType: 'stream',
url: 'https://My-test-resource-123.openai.azure.com/openai/deployments/NEW-DEPLOYMENT-321/chat/completions?api-version=2023-05-15',
method: 'post',
responseSchema: GenAiStreamingResponseSchema,
data: JSON.stringify({
...body,
stream: true,
}),
headers: {
'api-key': '123',
'content-type': 'application/json',
},
});
expect(response).toEqual({
headers: { 'Content-Type': 'dont-compress-this' },
result: 'success',
});
});
it('errors during API calls are properly handled', async () => {
// @ts-ignore
connector.request = mockError;
await expect(
connector.streamApi({ body: JSON.stringify(sampleAzureAiBody), stream: true })
).rejects.toThrow('API Error');
});
expect(response).toEqual({ result: 'success' });
});
});

View file

@ -12,18 +12,27 @@ import {
GenAiRunActionParamsSchema,
GenAiRunActionResponseSchema,
GenAiDashboardActionParamsSchema,
GenAiStreamActionParamsSchema,
GenAiStreamingResponseSchema,
} from '../../../common/gen_ai/schema';
import type {
GenAiConfig,
GenAiSecrets,
GenAiRunActionParams,
GenAiRunActionResponse,
GenAiStreamActionParams,
} from '../../../common/gen_ai/types';
import { OpenAiProviderType, SUB_ACTION } from '../../../common/gen_ai/constants';
import { SUB_ACTION } from '../../../common/gen_ai/constants';
import {
GenAiDashboardActionParams,
GenAiDashboardActionResponse,
} from '../../../common/gen_ai/types';
import {
getAxiosOptions,
getRequestWithStreamOption,
pipeStreamingResponse,
sanitizeRequest,
} from './lib/utils';
export class GenAiConnector extends SubActionConnector<GenAiConfig, GenAiSecrets> {
private url;
@ -53,6 +62,12 @@ export class GenAiConnector extends SubActionConnector<GenAiConfig, GenAiSecrets
schema: GenAiRunActionParamsSchema,
});
this.registerSubAction({
name: SUB_ACTION.STREAM,
method: 'streamApi',
schema: GenAiStreamActionParamsSchema,
});
this.registerSubAction({
name: SUB_ACTION.DASHBOARD,
method: 'getDashboard',
@ -71,21 +86,34 @@ export class GenAiConnector extends SubActionConnector<GenAiConfig, GenAiSecrets
}
public async runApi({ body }: GenAiRunActionParams): Promise<GenAiRunActionResponse> {
const sanitizedBody = sanitizeRequest(this.provider, this.url, body);
const axiosOptions = getAxiosOptions(this.provider, this.key, false);
const response = await this.request({
url: this.url,
method: 'post',
responseSchema: GenAiRunActionResponseSchema,
data: body,
headers: {
...(this.provider === OpenAiProviderType.OpenAi
? { Authorization: `Bearer ${this.key}` }
: { ['api-key']: this.key }),
['content-type']: 'application/json',
},
data: sanitizedBody,
...axiosOptions,
});
return response.data;
}
public async streamApi({
body,
stream,
}: GenAiStreamActionParams): Promise<GenAiRunActionResponse> {
const executeBody = getRequestWithStreamOption(this.provider, this.url, body, stream);
const axiosOptions = getAxiosOptions(this.provider, this.key, stream);
const response = await this.request({
url: this.url,
method: 'post',
responseSchema: stream ? GenAiStreamingResponseSchema : GenAiRunActionResponseSchema,
data: executeBody,
...axiosOptions,
});
return stream ? pipeStreamingResponse(response) : response.data;
}
public async getDashboard({
dashboardId,
}: GenAiDashboardActionParams): Promise<GenAiDashboardActionResponse> {

View file

@ -0,0 +1,196 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
sanitizeRequest,
getRequestWithStreamOption,
transformApiUrlToRegex,
} from './azure_openai_utils';
import {
AZURE_OPENAI_CHAT_URL,
AZURE_OPENAI_COMPLETIONS_URL,
AZURE_OPENAI_COMPLETIONS_EXTENSIONS_URL,
} from '../../../../common/gen_ai/constants';
describe('Azure Open AI Utils', () => {
const chatUrl =
'https://My-test-resource-123.openai.azure.com/openai/deployments/NEW-DEPLOYMENT-321/chat/completions?api-version=2023-05-15';
const completionUrl =
'https://My-test-resource-123.openai.azure.com/openai/deployments/NEW-DEPLOYMENT-321/completions?api-version=2023-05-15';
const completionExtensionsUrl =
'https://My-test-resource-123.openai.azure.com/openai/deployments/NEW-DEPLOYMENT-321/extensions/chat/completions?api-version=2023-05-15';
describe('sanitizeRequest', () => {
it('sets stream to false when stream is set to true in the body', () => {
const body = {
stream: true,
messages: [
{
role: 'user',
content: 'This is a test',
},
],
};
[chatUrl, completionUrl, completionExtensionsUrl].forEach((url: string) => {
const sanitizedBodyString = sanitizeRequest(url, JSON.stringify(body));
expect(sanitizedBodyString).toEqual(
`{\"stream\":false,\"messages\":[{\"role\":\"user\",\"content\":\"This is a test\"}]}`
);
});
});
it('sets stream to false when stream is not defined in the body', () => {
const body = {
messages: [
{
role: 'user',
content: 'This is a test',
},
],
};
[chatUrl, completionUrl, completionExtensionsUrl].forEach((url: string) => {
const sanitizedBodyString = sanitizeRequest(url, JSON.stringify(body));
expect(sanitizedBodyString).toEqual(
`{\"messages\":[{\"role\":\"user\",\"content\":\"This is a test\"}],\"stream\":false}`
);
});
});
it('sets stream to false when stream is set to false in the body', () => {
const body = {
stream: false,
messages: [
{
role: 'user',
content: 'This is a test',
},
],
};
[chatUrl, completionUrl, completionExtensionsUrl].forEach((url: string) => {
const sanitizedBodyString = sanitizeRequest(url, JSON.stringify(body));
expect(sanitizedBodyString).toEqual(
`{\"stream\":false,\"messages\":[{\"role\":\"user\",\"content\":\"This is a test\"}]}`
);
});
});
it('does nothing when body is malformed JSON', () => {
const bodyString = `{\"messages\":[{\"role\":\"user\",\"content\":\"This is a test\"}],,}`;
[chatUrl, completionUrl, completionExtensionsUrl].forEach((url: string) => {
const sanitizedBodyString = sanitizeRequest(url, bodyString);
expect(sanitizedBodyString).toEqual(bodyString);
});
});
it('does nothing when url does not accept stream parameter', () => {
const bodyString = `{\"messages\":[{\"role\":\"user\",\"content\":\"This is a test\"}]}`;
const sanitizedBodyString = sanitizeRequest('https://randostring.ai', bodyString);
expect(sanitizedBodyString).toEqual(bodyString);
});
});
describe('getRequestWithStreamOption', () => {
it('sets stream parameter when stream is not defined in the body', () => {
const body = {
messages: [
{
role: 'user',
content: 'This is a test',
},
],
};
[chatUrl, completionUrl, completionExtensionsUrl].forEach((url: string) => {
const sanitizedBodyString = getRequestWithStreamOption(url, JSON.stringify(body), true);
expect(sanitizedBodyString).toEqual(
`{\"messages\":[{\"role\":\"user\",\"content\":\"This is a test\"}],\"stream\":true}`
);
});
});
it('overrides stream parameter if defined in body', () => {
const body = {
stream: true,
messages: [
{
role: 'user',
content: 'This is a test',
},
],
};
[chatUrl, completionUrl, completionExtensionsUrl].forEach((url: string) => {
const sanitizedBodyString = getRequestWithStreamOption(url, JSON.stringify(body), false);
expect(sanitizedBodyString).toEqual(
`{\"stream\":false,\"messages\":[{\"role\":\"user\",\"content\":\"This is a test\"}]}`
);
});
});
it('does nothing when body is malformed JSON', () => {
const bodyString = `{\"messages\":[{\"role\":\"user\",\"content\":\"This is a test\"}],,}`;
[chatUrl, completionUrl, completionExtensionsUrl].forEach((url: string) => {
const sanitizedBodyString = getRequestWithStreamOption(url, bodyString, false);
expect(sanitizedBodyString).toEqual(bodyString);
});
});
it('does nothing when url does not accept stream parameter', () => {
const bodyString = `{\"messages\":[{\"role\":\"user\",\"content\":\"This is a test\"}]}`;
const sanitizedBodyString = getRequestWithStreamOption(
'https://randostring.ai',
bodyString,
true
);
expect(sanitizedBodyString).toEqual(bodyString);
});
});
describe('transformApiUrlToRegex', () => {
it('should match valid chat url', () => {
const regex = transformApiUrlToRegex(AZURE_OPENAI_CHAT_URL);
const match = chatUrl.match(regex);
expect(match).not.toBeNull();
expect(match![0]).toEqual(
'https://My-test-resource-123.openai.azure.com/openai/deployments/NEW-DEPLOYMENT-321/chat/completions'
);
});
it('should match valid completion url', () => {
const regex = transformApiUrlToRegex(AZURE_OPENAI_COMPLETIONS_URL);
const match = completionUrl.match(regex);
expect(match).not.toBeNull();
expect(match![0]).toEqual(
'https://My-test-resource-123.openai.azure.com/openai/deployments/NEW-DEPLOYMENT-321/completions'
);
});
it('should match valid completion extensions url', () => {
const regex = transformApiUrlToRegex(AZURE_OPENAI_COMPLETIONS_EXTENSIONS_URL);
const match = completionExtensionsUrl.match(regex);
expect(match).not.toBeNull();
expect(match![0]).toEqual(
'https://My-test-resource-123.openai.azure.com/openai/deployments/NEW-DEPLOYMENT-321/extensions/chat/completions'
);
});
it('should not match invalid chat url', () => {
const regex = transformApiUrlToRegex(AZURE_OPENAI_CHAT_URL);
const match =
'https://openai.azure.com/openai/deployments/NEW-DEPLOYMENT-321/chat/completions?api-version=2023-05-15'.match(
regex
);
expect(match).toBeNull();
});
it('should not match invalid completion url', () => {
const regex = transformApiUrlToRegex(AZURE_OPENAI_COMPLETIONS_URL);
const match = 'https://fooai.com/completions?api-version=2023-05-15'.match(regex);
expect(match).toBeNull();
});
it('should not match invalid completion extensions url', () => {
const regex = transformApiUrlToRegex(AZURE_OPENAI_COMPLETIONS_EXTENSIONS_URL);
const match =
'https://My-test-resource-123.openai.azure.com/openai/deployments/extensions/chat/completions?api-version=2023-05-15'.match(
regex
);
expect(match).toBeNull();
});
});
});

View file

@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
AZURE_OPENAI_CHAT_URL,
AZURE_OPENAI_COMPLETIONS_URL,
AZURE_OPENAI_COMPLETIONS_EXTENSIONS_URL,
} from '../../../../common/gen_ai/constants';
const APIS_ALLOWING_STREAMING = new Set<string>([
AZURE_OPENAI_CHAT_URL,
AZURE_OPENAI_COMPLETIONS_URL,
AZURE_OPENAI_COMPLETIONS_EXTENSIONS_URL,
]);
/**
* Sanitizes the Azure Open AI request body to set stream to false
* so users cannot specify a streaming response when the framework
* is not prepared to handle streaming
*
* The stream parameter is only accepted in the Chat API, the Completion API
* and the Completions Extensions API
*/
export const sanitizeRequest = (url: string, body: string): string => {
return getRequestWithStreamOption(url, body, false);
};
/**
* Intercepts the Azure Open AI request body to set the stream parameter
*
* The stream parameter is only accepted in the Chat API, the Completion API
* and the Completions Extensions API
*/
export const getRequestWithStreamOption = (url: string, body: string, stream: boolean): string => {
if (
!Array.from(APIS_ALLOWING_STREAMING)
.map((apiUrl: string) => transformApiUrlToRegex(apiUrl))
.some((regex: RegExp) => url.match(regex) != null)
) {
return body;
}
try {
const jsonBody = JSON.parse(body);
if (jsonBody) {
jsonBody.stream = stream;
}
return JSON.stringify(jsonBody);
} catch (err) {
// swallow the error
}
return body;
};
export const transformApiUrlToRegex = (apiUrl: string): RegExp => {
return new RegExp(
apiUrl
.replaceAll(`/`, `\/`)
.replaceAll(`.`, `\.`)
.replace(`{your-resource-name}`, `[^.]+`)
.replace(`{deployment-id}`, `[^\/]+`)
.replace(`?api-version={api-version}`, ``),
'g'
);
};

View file

@ -0,0 +1,149 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { sanitizeRequest, getRequestWithStreamOption } from './openai_utils';
import { OPENAI_CHAT_URL, OPENAI_LEGACY_COMPLETION_URL } from '../../../../common/gen_ai/constants';
describe('Open AI Utils', () => {
describe('sanitizeRequest', () => {
it('sets stream to false when stream is set to true in the body', () => {
const body = {
model: 'gpt-4',
stream: true,
messages: [
{
role: 'user',
content: 'This is a test',
},
],
};
[OPENAI_CHAT_URL, OPENAI_LEGACY_COMPLETION_URL].forEach((url: string) => {
const sanitizedBodyString = sanitizeRequest(url, JSON.stringify(body));
expect(sanitizedBodyString).toEqual(
`{\"model\":\"gpt-4\",\"stream\":false,\"messages\":[{\"role\":\"user\",\"content\":\"This is a test\"}]}`
);
});
});
it('sets stream to false when stream is not defined in the body', () => {
const body = {
model: 'gpt-4',
messages: [
{
role: 'user',
content: 'This is a test',
},
],
};
[OPENAI_CHAT_URL, OPENAI_LEGACY_COMPLETION_URL].forEach((url: string) => {
const sanitizedBodyString = sanitizeRequest(url, JSON.stringify(body));
expect(sanitizedBodyString).toEqual(
`{\"model\":\"gpt-4\",\"messages\":[{\"role\":\"user\",\"content\":\"This is a test\"}],\"stream\":false}`
);
});
});
it('sets stream to false when stream is set to false in the body', () => {
const body = {
model: 'gpt-4',
stream: false,
messages: [
{
role: 'user',
content: 'This is a test',
},
],
};
[OPENAI_CHAT_URL, OPENAI_LEGACY_COMPLETION_URL].forEach((url: string) => {
const sanitizedBodyString = sanitizeRequest(url, JSON.stringify(body));
expect(sanitizedBodyString).toEqual(
`{\"model\":\"gpt-4\",\"stream\":false,\"messages\":[{\"role\":\"user\",\"content\":\"This is a test\"}]}`
);
});
});
it('does nothing when body is malformed JSON', () => {
const bodyString = `{\"model\":\"gpt-4\",\"messages\":[{\"role\":\"user\",\"content\":\"This is a test\"}],,}`;
[OPENAI_CHAT_URL, OPENAI_LEGACY_COMPLETION_URL].forEach((url: string) => {
const sanitizedBodyString = sanitizeRequest(url, bodyString);
expect(sanitizedBodyString).toEqual(bodyString);
});
});
it('does nothing when url does not accept stream parameter', () => {
const bodyString = `{\"model\":\"gpt-4\",\"messages\":[{\"role\":\"user\",\"content\":\"This is a test\"}]}`;
const sanitizedBodyString = sanitizeRequest('https://randostring.ai', bodyString);
expect(sanitizedBodyString).toEqual(bodyString);
});
});
describe('getRequestWithStreamOption', () => {
it('sets stream parameter when stream is not defined in the body', () => {
const body = {
model: 'gpt-4',
messages: [
{
role: 'user',
content: 'This is a test',
},
],
};
[OPENAI_CHAT_URL, OPENAI_LEGACY_COMPLETION_URL].forEach((url: string) => {
const sanitizedBodyString = getRequestWithStreamOption(url, JSON.stringify(body), true);
expect(sanitizedBodyString).toEqual(
`{\"model\":\"gpt-4\",\"messages\":[{\"role\":\"user\",\"content\":\"This is a test\"}],\"stream\":true}`
);
});
});
it('overrides stream parameter if defined in body', () => {
const body = {
model: 'gpt-4',
stream: true,
messages: [
{
role: 'user',
content: 'This is a test',
},
],
};
[OPENAI_CHAT_URL, OPENAI_LEGACY_COMPLETION_URL].forEach((url: string) => {
const sanitizedBodyString = getRequestWithStreamOption(url, JSON.stringify(body), false);
expect(sanitizedBodyString).toEqual(
`{\"model\":\"gpt-4\",\"stream\":false,\"messages\":[{\"role\":\"user\",\"content\":\"This is a test\"}]}`
);
});
});
it('does nothing when body is malformed JSON', () => {
const bodyString = `{\"model\":\"gpt-4\",\"messages\":[{\"role\":\"user\",\"content\":\"This is a test\"}],,}`;
[OPENAI_CHAT_URL, OPENAI_LEGACY_COMPLETION_URL].forEach((url: string) => {
const sanitizedBodyString = getRequestWithStreamOption(url, bodyString, false);
expect(sanitizedBodyString).toEqual(bodyString);
});
});
it('does nothing when url does not accept stream parameter', () => {
const bodyString = `{\"model\":\"gpt-4\",\"messages\":[{\"role\":\"user\",\"content\":\"This is a test\"}]}`;
const sanitizedBodyString = getRequestWithStreamOption(
'https://randostring.ai',
bodyString,
true
);
expect(sanitizedBodyString).toEqual(bodyString);
});
});
});

View file

@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { OPENAI_CHAT_URL, OPENAI_LEGACY_COMPLETION_URL } from '../../../../common/gen_ai/constants';
const APIS_ALLOWING_STREAMING = new Set<string>([OPENAI_CHAT_URL, OPENAI_LEGACY_COMPLETION_URL]);
/**
* Sanitizes the Open AI request body to set stream to false
* so users cannot specify a streaming response when the framework
* is not prepared to handle streaming
*
* The stream parameter is accepted in the ChatCompletion
* API and the Completion API only
*/
export const sanitizeRequest = (url: string, body: string): string => {
return getRequestWithStreamOption(url, body, false);
};
/**
* Intercepts the Open AI request body to set the stream parameter
*
* The stream parameter is accepted in the ChatCompletion
* API and the Completion API only
*/
export const getRequestWithStreamOption = (url: string, body: string, stream: boolean): string => {
if (!APIS_ALLOWING_STREAMING.has(url)) {
return body;
}
try {
const jsonBody = JSON.parse(body);
if (jsonBody) {
jsonBody.stream = stream;
}
return JSON.stringify(jsonBody);
} catch (err) {
// swallow the error
}
return body;
};

View file

@ -0,0 +1,127 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { sanitizeRequest, getRequestWithStreamOption, getAxiosOptions } from './utils';
import { OpenAiProviderType, OPENAI_CHAT_URL } from '../../../../common/gen_ai/constants';
import {
sanitizeRequest as openAiSanitizeRequest,
getRequestWithStreamOption as openAiGetRequestWithStreamOption,
} from './openai_utils';
import {
sanitizeRequest as azureAiSanitizeRequest,
getRequestWithStreamOption as azureAiGetRequestWithStreamOption,
} from './azure_openai_utils';
jest.mock('./openai_utils');
jest.mock('./azure_openai_utils');
describe('Utils', () => {
const azureAiUrl =
'https://test.openai.azure.com/openai/deployments/abc/chat/completions?api-version=2023-06-01-preview';
const bodyString = JSON.stringify({
model: 'gpt-4',
stream: true,
messages: [
{
role: 'user',
content: 'This is a test',
},
],
});
describe('sanitizeRequest', () => {
const mockOpenAiSanitizeRequest = openAiSanitizeRequest as jest.Mock;
const mockAzureAiSanitizeRequest = azureAiSanitizeRequest as jest.Mock;
beforeEach(() => {
jest.clearAllMocks();
});
it('calls openai_utils sanitizeRequest when provider is OpenAi', () => {
sanitizeRequest(OpenAiProviderType.OpenAi, OPENAI_CHAT_URL, bodyString);
expect(mockOpenAiSanitizeRequest).toHaveBeenCalledWith(OPENAI_CHAT_URL, bodyString);
expect(mockAzureAiSanitizeRequest).not.toHaveBeenCalled();
});
it('calls azure_openai_utils sanitizeRequest when provider is AzureAi', () => {
sanitizeRequest(OpenAiProviderType.AzureAi, azureAiUrl, bodyString);
expect(mockAzureAiSanitizeRequest).toHaveBeenCalledWith(azureAiUrl, bodyString);
expect(mockOpenAiSanitizeRequest).not.toHaveBeenCalled();
});
it('does not call any helper fns when provider is unrecognized', () => {
sanitizeRequest('foo', OPENAI_CHAT_URL, bodyString);
expect(mockOpenAiSanitizeRequest).not.toHaveBeenCalled();
expect(mockAzureAiSanitizeRequest).not.toHaveBeenCalled();
});
});
describe('getRequestWithStreamOption', () => {
const mockOpenAiGetRequestWithStreamOption = openAiGetRequestWithStreamOption as jest.Mock;
const mockAzureAiGetRequestWithStreamOption = azureAiGetRequestWithStreamOption as jest.Mock;
beforeEach(() => {
jest.clearAllMocks();
});
it('calls openai_utils getRequestWithStreamOption when provider is OpenAi', () => {
getRequestWithStreamOption(OpenAiProviderType.OpenAi, OPENAI_CHAT_URL, bodyString, true);
expect(mockOpenAiGetRequestWithStreamOption).toHaveBeenCalledWith(
OPENAI_CHAT_URL,
bodyString,
true
);
expect(mockAzureAiGetRequestWithStreamOption).not.toHaveBeenCalled();
});
it('calls azure_openai_utils getRequestWithStreamOption when provider is AzureAi', () => {
getRequestWithStreamOption(OpenAiProviderType.AzureAi, azureAiUrl, bodyString, true);
expect(mockAzureAiGetRequestWithStreamOption).toHaveBeenCalledWith(
azureAiUrl,
bodyString,
true
);
expect(mockOpenAiGetRequestWithStreamOption).not.toHaveBeenCalled();
});
it('does not call any helper fns when provider is unrecognized', () => {
getRequestWithStreamOption('foo', OPENAI_CHAT_URL, bodyString, true);
expect(mockOpenAiGetRequestWithStreamOption).not.toHaveBeenCalled();
expect(mockAzureAiGetRequestWithStreamOption).not.toHaveBeenCalled();
});
});
describe('getAxiosOptions', () => {
it('returns correct axios options when provider is openai and stream is false', () => {
expect(getAxiosOptions(OpenAiProviderType.OpenAi, 'api-abc', false)).toEqual({
headers: { Authorization: `Bearer api-abc`, ['content-type']: 'application/json' },
});
});
it('returns correct axios options when provider is openai and stream is true', () => {
expect(getAxiosOptions(OpenAiProviderType.OpenAi, 'api-abc', true)).toEqual({
headers: { Authorization: `Bearer api-abc`, ['content-type']: 'application/json' },
responseType: 'stream',
});
});
it('returns correct axios options when provider is azure openai and stream is false', () => {
expect(getAxiosOptions(OpenAiProviderType.AzureAi, 'api-abc', false)).toEqual({
headers: { ['api-key']: `api-abc`, ['content-type']: 'application/json' },
});
});
it('returns correct axios options when provider is azure openai and stream is true', () => {
expect(getAxiosOptions(OpenAiProviderType.AzureAi, 'api-abc', true)).toEqual({
headers: { ['api-key']: `api-abc`, ['content-type']: 'application/json' },
responseType: 'stream',
});
});
it('returns empty options when provider is unrecognized', () => {
expect(getAxiosOptions('foo', 'api-abc', true)).toEqual({ headers: {} });
});
});
});

View file

@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { AxiosRequestHeaders, AxiosResponse, ResponseType } from 'axios';
import { IncomingMessage } from 'http';
import { OpenAiProviderType } from '../../../../common/gen_ai/constants';
import {
sanitizeRequest as openAiSanitizeRequest,
getRequestWithStreamOption as openAiGetRequestWithStreamOption,
} from './openai_utils';
import {
sanitizeRequest as azureAiSanitizeRequest,
getRequestWithStreamOption as azureAiGetRequestWithStreamOption,
} from './azure_openai_utils';
export const sanitizeRequest = (provider: string, url: string, body: string): string => {
switch (provider) {
case OpenAiProviderType.OpenAi:
return openAiSanitizeRequest(url, body);
case OpenAiProviderType.AzureAi:
return azureAiSanitizeRequest(url, body);
default:
return body;
}
};
export const getRequestWithStreamOption = (
provider: string,
url: string,
body: string,
stream: boolean
): string => {
switch (provider) {
case OpenAiProviderType.OpenAi:
return openAiGetRequestWithStreamOption(url, body, stream);
case OpenAiProviderType.AzureAi:
return azureAiGetRequestWithStreamOption(url, body, stream);
default:
return body;
}
};
export const getAxiosOptions = (
provider: string,
apiKey: string,
stream: boolean
): { headers: AxiosRequestHeaders; responseType?: ResponseType } => {
const responseType = stream ? { responseType: 'stream' as ResponseType } : {};
switch (provider) {
case OpenAiProviderType.OpenAi:
return {
headers: { Authorization: `Bearer ${apiKey}`, ['content-type']: 'application/json' },
...responseType,
};
case OpenAiProviderType.AzureAi:
return {
headers: { ['api-key']: apiKey, ['content-type']: 'application/json' },
...responseType,
};
default:
return { headers: {} };
}
};
export const pipeStreamingResponse = (response: AxiosResponse<IncomingMessage>) => {
// Streaming responses are compressed by the Hapi router by default
// Set content-type to something that's not recognized by Hapi in order to circumvent this
response.data.headers = {
['Content-Type']: 'dont-compress-this',
};
return response.data;
};

View file

@ -111,7 +111,7 @@ export default function genAiTest({ getService }: FtrProviderContext) {
statusCode: 400,
error: 'Bad Request',
message:
'error validating action type config: [apiProvider]: expected value of type [string] but got [undefined]',
'error validating action type config: [apiProvider]: expected at least one defined value but got [undefined]',
});
});
});

View file

@ -4346,6 +4346,10 @@
version "0.0.0"
uid ""
"@kbn/gen-ai-streaming-response-example-plugin@link:x-pack/examples/gen_ai_streaming_response_example":
version "0.0.0"
uid ""
"@kbn/generate-console-definitions@link:packages/kbn-generate-console-definitions":
version "0.0.0"
uid ""