[inference] surface error message from connector (#207393)

## Summary

In case of error during the connector's execution, the inference
adapters were not properly propagating the error message. This PR
addresses it.

### Before

<img width="368" alt="Screenshot 2025-01-21 at 14 05 30"
src="https://github.com/user-attachments/assets/65cce33d-cdca-442e-bf31-9bf09c4c6800"
/>

### After

<img width="738" alt="Screenshot 2025-01-21 at 14 04 44"
src="https://github.com/user-attachments/assets/7d4fdee8-5989-47a1-8e56-21621f9b79fc"
/>
This commit is contained in:
Pierre Gayvallet 2025-01-22 08:23:16 +01:00 committed by GitHub
parent c12c88d243
commit 52be832724
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 138 additions and 9 deletions

View file

@ -7,6 +7,7 @@
import { PassThrough } from 'stream';
import { loggerMock } from '@kbn/logging-mocks';
import { lastValueFrom, toArray } from 'rxjs';
import type { InferenceExecutor } from '../../utils/inference_executor';
import { MessageRole, ToolChoiceType } from '@kbn/inference-common';
import { bedrockClaudeAdapter } from './bedrock_claude_adapter';
@ -464,5 +465,30 @@ describe('bedrockClaudeAdapter', () => {
}),
});
});
it('throws an error if the connector response is in error', async () => {
executorMock.invoke.mockImplementation(async () => {
return {
actionId: 'actionId',
status: 'error',
serviceMessage: 'something went wrong',
data: undefined,
};
});
await expect(
lastValueFrom(
bedrockClaudeAdapter
.chatComplete({
logger,
executor: executorMock,
messages: [{ role: MessageRole.User, content: 'Hello' }],
})
.pipe(toArray())
)
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Error calling connector: something went wrong"`
);
});
});
});

View file

@ -5,8 +5,8 @@
* 2.0.
*/
import { filter, from, map, switchMap, tap } from 'rxjs';
import { Readable } from 'stream';
import { filter, from, map, switchMap, tap, throwError } from 'rxjs';
import { isReadable, Readable } from 'stream';
import {
Message,
MessageRole,
@ -56,8 +56,19 @@ export const bedrockClaudeAdapter: InferenceConnectorAdapter = {
})
).pipe(
switchMap((response) => {
const readable = response.data as Readable;
return serdeEventstreamIntoObservable(readable);
if (response.status === 'error') {
return throwError(() =>
createInferenceInternalError(`Error calling connector: ${response.serviceMessage}`, {
rootError: response.serviceMessage,
})
);
}
if (isReadable(response.data as any)) {
return serdeEventstreamIntoObservable(response.data as Readable);
}
return throwError(() =>
createInferenceInternalError('Unexpected error', response.data as Record<string, any>)
);
}),
tap((eventData) => {
if ('modelStreamErrorException' in eventData) {

View file

@ -536,5 +536,30 @@ describe('geminiAdapter', () => {
}),
});
});
it('throws an error if the connector response is in error', async () => {
executorMock.invoke.mockImplementation(async () => {
return {
actionId: 'actionId',
status: 'error',
serviceMessage: 'something went wrong',
data: undefined,
};
});
await expect(
lastValueFrom(
geminiAdapter
.chatComplete({
logger,
executor: executorMock,
messages: [{ role: MessageRole.User, content: 'Hello' }],
})
.pipe(toArray())
)
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Error calling connector: something went wrong"`
);
});
});
});

View file

@ -6,9 +6,10 @@
*/
import * as Gemini from '@google/generative-ai';
import { from, map, switchMap } from 'rxjs';
import { Readable } from 'stream';
import { from, map, switchMap, throwError } from 'rxjs';
import { isReadable, Readable } from 'stream';
import {
createInferenceInternalError,
Message,
MessageRole,
ToolChoiceType,
@ -48,8 +49,19 @@ export const geminiAdapter: InferenceConnectorAdapter = {
})
).pipe(
switchMap((response) => {
const readable = response.data as Readable;
return eventSourceStreamIntoObservable(readable);
if (response.status === 'error') {
return throwError(() =>
createInferenceInternalError(`Error calling connector: ${response.serviceMessage}`, {
rootError: response.serviceMessage,
})
);
}
if (isReadable(response.data as any)) {
return eventSourceStreamIntoObservable(response.data as Readable);
}
return throwError(() =>
createInferenceInternalError('Unexpected error', response.data as Record<string, any>)
);
}),
map((line) => {
return JSON.parse(line) as GenerateContentResponseChunk;

View file

@ -182,5 +182,29 @@ describe('inferenceAdapter', () => {
}),
});
});
it('throws an error if the connector response is in error', async () => {
executorMock.invoke.mockImplementation(async () => {
return {
actionId: 'actionId',
status: 'error',
serviceMessage: 'something went wrong',
data: undefined,
};
});
await expect(
lastValueFrom(
inferenceAdapter
.chatComplete({
...defaultArgs,
messages: [{ role: MessageRole.User, content: 'Hello' }],
})
.pipe(toArray())
)
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Error calling connector: something went wrong"`
);
});
});
});

View file

@ -72,7 +72,7 @@ export const inferenceAdapter: InferenceConnectorAdapter = {
switchMap((response) => {
if (response.status === 'error') {
return throwError(() =>
createInferenceInternalError('Error calling the inference API', {
createInferenceInternalError(`Error calling connector: ${response.serviceMessage}`, {
rootError: response.serviceMessage,
})
);

View file

@ -395,6 +395,30 @@ describe('openAIAdapter', () => {
});
});
it('throws an error if the connector response is in error', async () => {
executorMock.invoke.mockImplementation(async () => {
return {
actionId: 'actionId',
status: 'error',
serviceMessage: 'something went wrong',
data: undefined,
};
});
await expect(
lastValueFrom(
openAIAdapter
.chatComplete({
...defaultArgs,
messages: [{ role: MessageRole.User, content: 'Hello' }],
})
.pipe(toArray())
)
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Error calling connector: something went wrong"`
);
});
it('emits chunk events', async () => {
const response$ = openAIAdapter.chatComplete({
...defaultArgs,

View file

@ -69,6 +69,13 @@ export const openAIAdapter: InferenceConnectorAdapter = {
})
).pipe(
switchMap((response) => {
if (response.status === 'error') {
return throwError(() =>
createInferenceInternalError(`Error calling connector: ${response.serviceMessage}`, {
rootError: response.serviceMessage,
})
);
}
if (isReadable(response.data as any)) {
return eventSourceStreamIntoObservable(response.data as Readable);
}