[8.14] [Obs AI Assistant] Keep connection open, limit no of fields (#186811) (#187131)

# Backport

This will backport the following commits from `main` to `8.14`:
- [[Obs AI Assistant] Keep connection open, limit no of fields
(#186811)](https://github.com/elastic/kibana/pull/186811)

<!--- Backport version: 7.3.2 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT {commits} BACKPORT-->
This commit is contained in:
Dario Gieselaar 2024-07-17 18:34:02 +02:00 committed by GitHub
parent 5afbed3165
commit 5dd1739bbf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 28 additions and 14 deletions

View file

@ -33,7 +33,7 @@ export async function getRelevantFieldNames({
messages: Message[];
chat: FunctionCallChatFunction;
signal: AbortSignal;
}): Promise<{ fields: string[] }> {
}): Promise<{ fields: string[]; stats: { analyzed: number; total: number } }> {
const dataViewsService = await dataViews.dataViewsServiceFactory(savedObjectsClient, esClient);
const fields = await dataViewsService.getFieldsForWildcard({
@ -71,8 +71,13 @@ export async function getRelevantFieldNames({
const groupedFields = groupBy(allFields, (field) => field.name);
const MAX_CHUNKS = 5;
const FIELD_NAMES_PER_CHUNK = 250;
const fieldNamesToAnalyze = fieldNames.slice(0, MAX_CHUNKS * FIELD_NAMES_PER_CHUNK);
const relevantFields = await Promise.all(
chunk(fieldNames, 500).map(async (fieldsInChunk) => {
chunk(fieldNamesToAnalyze, FIELD_NAMES_PER_CHUNK).map(async (fieldsInChunk) => {
const chunkResponse$ = (
await chat('get_relevent_dataset_names', {
signal,
@ -138,5 +143,8 @@ export async function getRelevantFieldNames({
})
);
return { fields: relevantFields.flat() };
return {
fields: relevantFields.flat(),
stats: { analyzed: fieldNamesToAnalyze.length, total: fieldNames.length },
};
}

View file

@ -47,7 +47,7 @@ export function registerGetDatasetInfoFunction({
try {
const body = await esClient.indices.resolveIndex({
name: index === '' ? '*' : index,
name: index === '' ? '*' : index.split(','),
expand_wildcards: 'open',
});
indices = [
@ -86,11 +86,11 @@ export function registerGetDatasetInfoFunction({
signal,
chat,
});
return {
content: {
indices: [index],
fields: relevantFieldNames,
fields: relevantFieldNames.fields,
stats: relevantFieldNames.stats,
},
};
}

View file

@ -6,7 +6,7 @@
*/
import { repeat } from 'lodash';
import { identity, Observable, OperatorFunction } from 'rxjs';
import { Observable, OperatorFunction } from 'rxjs';
import {
BufferFlushEvent,
StreamingChatResponseEventType,
@ -22,10 +22,6 @@ import {
export function flushBuffer<T extends StreamingChatResponseEventWithoutError | TokenCountEvent>(
isCloud: boolean
): OperatorFunction<T, T | BufferFlushEvent> {
if (!isCloud) {
return identity;
}
return (source: Observable<T>) =>
new Observable<T | BufferFlushEvent>((subscriber) => {
const cloudProxyBufferSize = 4096;
@ -41,7 +37,15 @@ export function flushBuffer<T extends StreamingChatResponseEventWithoutError | T
}
};
const intervalId = setInterval(flushBufferIfNeeded, 250);
const keepAlive = () => {
subscriber.next({
data: '0',
type: StreamingChatResponseEventType.BufferFlush,
});
};
const flushIntervalId = isCloud ? setInterval(flushBufferIfNeeded, 250) : undefined;
const keepAliveIntervalId = setInterval(keepAlive, 30_000);
source.subscribe({
next: (value) => {
@ -52,11 +56,13 @@ export function flushBuffer<T extends StreamingChatResponseEventWithoutError | T
subscriber.next(value);
},
error: (error) => {
clearInterval(intervalId);
clearInterval(flushIntervalId);
clearInterval(keepAliveIntervalId);
subscriber.error(error);
},
complete: () => {
clearInterval(intervalId);
clearInterval(flushIntervalId);
clearInterval(keepAliveIntervalId);
subscriber.complete();
},
});