[Expressions] Fix the execution pipeline not to stop on a flaky subexpression (#143852) (#143887)

* Fix the execution pipeline not to stop on a flaky subexpression
* Fix the execution pipeline not to stop on an invalid or incorrect value

(cherry picked from commit ee6aeba68f)

Co-authored-by: Michael Dokolin <mikhail.dokolin@elastic.co>
This commit is contained in:
Kibana Machine 2022-10-24 15:19:55 -04:00 committed by GitHub
parent 3db77af5d7
commit 896166585e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 169 additions and 36 deletions

View file

@ -14,6 +14,7 @@ import { parseExpression, ExpressionAstExpression } from '../ast';
import { createUnitTestExecutor } from '../test_helpers';
import { ExpressionFunctionDefinition } from '..';
import { ExecutionContract } from './execution_contract';
import { ExpressionValueBoxed } from '../expression_types';
beforeAll(() => {
if (typeof performance === 'undefined') {
@ -744,6 +745,79 @@ describe('Execution', () => {
});
});
});
test('continues execution when error state is gone', async () => {
testScheduler.run(({ cold, expectObservable, flush }) => {
const a = 1;
const b = 2;
const c = 3;
const d = 4;
const observable$ = cold('abcd|', { a, b, c, d });
const flakyFn = jest
.fn()
.mockImplementationOnce((value) => value)
.mockImplementationOnce(() => {
throw new Error('Some error.');
})
.mockReturnValueOnce({ type: 'something' })
.mockImplementationOnce((value) => value);
const spyFn = jest.fn((input, { arg }) => arg);
const executor = createUnitTestExecutor();
executor.registerFunction({
name: 'observable',
args: {},
help: '',
fn: () => observable$,
});
executor.registerFunction({
name: 'flaky',
args: {},
help: '',
fn: (value) => flakyFn(value),
});
executor.registerFunction({
name: 'spy',
args: {
arg: {
help: '',
types: ['number'],
},
},
help: '',
fn: (input, args) => spyFn(input, args),
});
const result = executor.run('spy arg={observable | flaky}', null, {});
expectObservable(result).toBe('abcd|', {
a: { partial: true, result: a },
b: {
partial: true,
result: {
type: 'error',
error: expect.objectContaining({ message: '[spy] > [flaky] > Some error.' }),
},
},
c: {
partial: true,
result: {
type: 'error',
error: expect.objectContaining({
message: `[spy] > Can not cast 'something' to any of 'number'`,
}),
},
},
d: { partial: false, result: d },
});
flush();
expect(spyFn).toHaveBeenCalledTimes(2);
expect(spyFn).toHaveBeenNthCalledWith(1, null, { arg: a });
expect(spyFn).toHaveBeenNthCalledWith(2, null, { arg: d });
});
});
});
describe('when arguments are missing', () => {
@ -847,6 +921,38 @@ describe('Execution', () => {
});
});
describe('when arguments are incorrect', () => {
it('when required argument is missing and has not alias, returns error', async () => {
const incorrectArg: ExpressionFunctionDefinition<
'incorrectArg',
unknown,
{ arg: ExpressionValueBoxed<'something'> },
unknown
> = {
name: 'incorrectArg',
args: {
arg: {
help: '',
required: true,
types: ['something'],
},
},
help: '',
fn: jest.fn(),
};
const executor = createUnitTestExecutor();
executor.registerFunction(incorrectArg);
const { result } = await lastValueFrom(executor.run('incorrectArg arg="string"', null, {}));
expect(result).toMatchObject({
type: 'error',
error: {
message: `[incorrectArg] > Can not cast 'string' to any of 'something'`,
},
});
});
});
describe('debug mode', () => {
test('can execute expression in debug mode', async () => {
const execution = createExecution('add val=1 | add val=2 | add val=3', {}, true);

View file

@ -351,20 +351,30 @@ export class Execution<
// actually have `then` or `subscribe` methods which would be treated as a `Promise`
// or an `Observable` accordingly.
return this.resolveArgs(fn, currentInput, fnArgs).pipe(
tap((args) => this.execution.params.debug && Object.assign(head.debug, { args })),
switchMap((args) => this.invokeFunction(fn, currentInput, args)),
switchMap((output) => (getType(output) === 'error' ? throwError(output) : of(output))),
tap((output) => this.execution.params.debug && Object.assign(head.debug, { output })),
switchMap((output) => this.invokeChain<ChainOutput>(tail, output)),
catchError((rawError) => {
const error = createError(rawError);
error.error.message = `[${fnName}] > ${error.error.message}`;
switchMap((resolvedArgs) => {
const args$ = isExpressionValueError(resolvedArgs)
? throwError(resolvedArgs.error)
: of(resolvedArgs);
if (this.execution.params.debug) {
Object.assign(head.debug, { error, rawError, success: false });
}
return args$.pipe(
tap((args) => this.execution.params.debug && Object.assign(head.debug, { args })),
switchMap((args) => this.invokeFunction(fn, currentInput, args)),
switchMap((output) =>
getType(output) === 'error' ? throwError(output) : of(output)
),
tap((output) => this.execution.params.debug && Object.assign(head.debug, { output })),
switchMap((output) => this.invokeChain<ChainOutput>(tail, output)),
catchError((rawError) => {
const error = createError(rawError);
error.error.message = `[${fnName}] > ${error.error.message}`;
return of(error);
if (this.execution.params.debug) {
Object.assign(head.debug, { error, rawError, success: false });
}
return of(error);
})
);
}),
finalize(() => {
if (this.execution.params.debug) {
@ -448,7 +458,10 @@ export class Execution<
}
}
throw new Error(`Can not cast '${fromTypeName}' to any of '${toTypeNames.join(', ')}'`);
throw createError({
name: 'invalid value',
message: `Can not cast '${fromTypeName}' to any of '${toTypeNames.join(', ')}'`,
});
}
validate<Type = unknown>(value: Type, argDef: ExpressionFunctionParameter<Type>): void {
@ -458,7 +471,10 @@ export class Execution<
}': '${argDef.options.join("', '")}'`;
if (argDef.strict) {
throw new Error(message);
throw createError({
message,
name: 'invalid argument',
});
}
this.logger?.warn(message);
@ -470,7 +486,7 @@ export class Execution<
fnDef: Fn,
input: unknown,
argAsts: Record<string, ExpressionAstArgument[]>
): Observable<Record<string, unknown>> {
): Observable<Record<string, unknown> | ExpressionValueError> {
return defer(() => {
const { args: argDefs } = fnDef;
@ -480,7 +496,10 @@ export class Execution<
(acc, argAst, argName) => {
const argDef = getByAlias(argDefs, argName);
if (!argDef) {
throw new Error(`Unknown argument '${argName}' passed to function '${fnDef.name}'`);
throw createError({
name: 'unknown argument',
message: `Unknown argument '${argName}' passed to function '${fnDef.name}'`,
});
}
if (argDef.deprecated && !acc[argDef.name]) {
this.logger?.warn(`Argument '${argName}' is deprecated in function '${fnDef.name}'`);
@ -501,7 +520,10 @@ export class Execution<
continue;
}
throw new Error(`${fnDef.name} requires the "${name}" argument`);
throw createError({
name: 'missing argument',
message: `${fnDef.name} requires the "${name}" argument`,
});
}
// Create the functions to resolve the argument ASTs into values
@ -512,14 +534,17 @@ export class Execution<
(subInput = input) =>
this.interpret(item, subInput).pipe(
pluck('result'),
map((output) => {
switchMap((output) => {
if (isExpressionValueError(output)) {
throw output.error;
return of(output);
}
return this.cast(output, argDefs[argName].types);
}),
tap((value) => this.validate(value, argDefs[argName]))
return of(output).pipe(
map((value) => this.cast(value, argDefs[argName].types)),
tap((value) => this.validate(value, argDefs[argName])),
catchError((error) => of(error))
);
})
)
)
);
@ -530,7 +555,7 @@ export class Execution<
return from([{}]);
}
const resolvedArgValuesObservable = combineLatest(
return combineLatest(
argNames.map((argName) => {
const interpretFns = resolveArgFns[argName];
@ -541,23 +566,25 @@ export class Execution<
}
return argDefs[argName].resolve
? combineLatest(interpretFns.map((fn) => fn()))
? combineLatest(interpretFns.map((fn) => fn())).pipe(
map((values) => values.find(isExpressionValueError) ?? values)
)
: of(interpretFns);
})
);
return resolvedArgValuesObservable.pipe(
map((resolvedArgValues) =>
mapValues(
// Return an object here because the arguments themselves might actually have a 'then'
// function which would be treated as a promise
zipObject(argNames, resolvedArgValues),
// Just return the last unless the argument definition allows multiple
(argValues, argName) => (argDefs[argName].multi ? argValues : last(argValues))
)
).pipe(
map(
(values) =>
values.find(isExpressionValueError) ??
mapValues(
// Return an object here because the arguments themselves might actually have a 'then'
// function which would be treated as a promise
zipObject(argNames, values as unknown[][]),
// Just return the last unless the argument definition allows multiple
(argValues, argName) => (argDefs[argName].multi ? argValues : last(argValues))
)
)
);
});
}).pipe(catchError((error) => of(error)));
}
interpret<T>(ast: ExpressionAstNode, input: T): Observable<ExecutionResult<unknown>> {