[Expressions] Fix expressions chain invocation not to unsubscribe on error (#142105)

This commit is contained in:
Michael Dokolin 2022-09-29 08:38:58 +02:00 committed by GitHub
parent ea046acd24
commit 6a0b2fd717
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 132 additions and 76 deletions

View file

@ -488,6 +488,63 @@ describe('Execution', () => {
expect(spy.fn).toHaveBeenCalledTimes(0);
});
test('continues execution when error state is gone', async () => {
testScheduler.run(({ cold, expectObservable, flush }) => {
const a = 1;
const b = 2;
const c = 3;
const observable$ = cold('abc|', { a, b, c });
const flakyFn = jest
.fn()
.mockImplementationOnce((value) => value)
.mockImplementationOnce(() => {
throw new Error('Some error.');
})
.mockImplementationOnce((value) => value);
const spyFn = jest.fn((value) => value);
const executor = createUnitTestExecutor();
executor.registerFunction({
name: 'observable',
args: {},
help: '',
fn: () => observable$,
});
executor.registerFunction({
name: 'flaky',
args: {},
help: '',
fn: (value) => flakyFn(value),
});
executor.registerFunction({
name: 'spy',
args: {},
help: '',
fn: (value) => spyFn(value),
});
const result = executor.run('observable | flaky | spy', null, {});
expectObservable(result).toBe('abc|', {
a: { partial: true, result: a },
b: {
partial: true,
result: {
type: 'error',
error: expect.objectContaining({ message: '[flaky] > Some error.' }),
},
},
c: { partial: false, result: c },
});
flush();
expect(spyFn).toHaveBeenCalledTimes(2);
expect(spyFn).toHaveBeenNthCalledWith(1, a);
expect(spyFn).toHaveBeenNthCalledWith(2, c);
});
});
});
describe('state', () => {

View file

@ -295,87 +295,86 @@ export class Execution<
}
invokeChain<ChainOutput = unknown>(
chainArr: ExpressionAstFunction[],
[head, ...tail]: ExpressionAstFunction[],
input: unknown
): Observable<ChainOutput> {
): Observable<ChainOutput | ExpressionValueError> {
if (!head) {
return of(input as ChainOutput);
}
return of(input).pipe(
...(chainArr.map((link) =>
switchMap((currentInput) => {
const { function: fnName, arguments: fnArgs } = link;
const fn = getByAlias(
this.state.get().functions,
fnName,
this.execution.params.namespace
);
switchMap((currentInput) => {
const { function: fnName, arguments: fnArgs } = head;
const fn = getByAlias(this.state.get().functions, fnName, this.execution.params.namespace);
if (!fn) {
throw createError({
name: 'fn not found',
message: i18n.translate('expressions.execution.functionNotFound', {
defaultMessage: `Function {fnName} could not be found.`,
values: {
fnName,
},
}),
});
}
if (fn.disabled) {
throw createError({
name: 'fn is disabled',
message: i18n.translate('expressions.execution.functionDisabled', {
defaultMessage: `Function {fnName} is disabled.`,
values: {
fnName,
},
}),
});
}
if (fn.deprecated) {
this.logger?.warn(`Function '${fnName}' is deprecated`);
}
if (this.execution.params.debug) {
link.debug = {
args: {},
duration: 0,
fn: fn.name,
input: currentInput,
success: true,
};
}
const timeStart = this.execution.params.debug ? now() : 0;
// `resolveArgs` returns an object because the arguments themselves might
// actually have `then` or `subscribe` methods which would be treated as a `Promise`
// or an `Observable` accordingly.
return this.resolveArgs(fn, currentInput, fnArgs).pipe(
tap((args) => this.execution.params.debug && Object.assign(link.debug, { args })),
switchMap((args) => this.invokeFunction(fn, currentInput, args)),
switchMap((output) => (getType(output) === 'error' ? throwError(output) : of(output))),
tap((output) => this.execution.params.debug && Object.assign(link.debug, { output })),
catchError((rawError) => {
const error = createError(rawError);
error.error.message = `[${fnName}] > ${error.error.message}`;
if (this.execution.params.debug) {
Object.assign(link.debug, { error, rawError, success: false });
}
return throwError(error);
if (!fn) {
throw createError({
name: 'fn not found',
message: i18n.translate('expressions.execution.functionNotFound', {
defaultMessage: `Function {fnName} could not be found.`,
values: {
fnName,
},
}),
finalize(() => {
if (this.execution.params.debug) {
Object.assign(link.debug, { duration: now() - timeStart });
}
})
);
})
) as Parameters<Observable<unknown>['pipe']>),
});
}
if (fn.disabled) {
throw createError({
name: 'fn is disabled',
message: i18n.translate('expressions.execution.functionDisabled', {
defaultMessage: `Function {fnName} is disabled.`,
values: {
fnName,
},
}),
});
}
if (fn.deprecated) {
this.logger?.warn(`Function '${fnName}' is deprecated`);
}
if (this.execution.params.debug) {
head.debug = {
args: {},
duration: 0,
fn: fn.name,
input: currentInput,
success: true,
};
}
const timeStart = this.execution.params.debug ? now() : 0;
// `resolveArgs` returns an object because the arguments themselves might
// actually have `then` or `subscribe` methods which would be treated as a `Promise`
// or an `Observable` accordingly.
return this.resolveArgs(fn, currentInput, fnArgs).pipe(
tap((args) => this.execution.params.debug && Object.assign(head.debug, { args })),
switchMap((args) => this.invokeFunction(fn, currentInput, args)),
switchMap((output) => (getType(output) === 'error' ? throwError(output) : of(output))),
tap((output) => this.execution.params.debug && Object.assign(head.debug, { output })),
switchMap((output) => this.invokeChain<ChainOutput>(tail, output)),
catchError((rawError) => {
const error = createError(rawError);
error.error.message = `[${fnName}] > ${error.error.message}`;
if (this.execution.params.debug) {
Object.assign(head.debug, { error, rawError, success: false });
}
return of(error);
}),
finalize(() => {
if (this.execution.params.debug) {
Object.assign(head.debug, { duration: now() - timeStart });
}
})
);
}),
catchError((error) => of(error))
) as Observable<ChainOutput>;
);
}
invokeFunction<Fn extends ExpressionFunction>(