[Expressions] Fix expressions execution abortion to prevent performance issues (#117714) (#117998)

This commit is contained in:
Michael Dokolin 2021-11-09 14:16:19 +01:00 committed by GitHub
parent b9cd07f8e4
commit b5fbca1bc0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -16,7 +16,6 @@ import {
from,
isObservable,
of,
race,
throwError,
Observable,
ReplaySubject,
@ -25,7 +24,7 @@ import { catchError, finalize, map, pluck, shareReplay, switchMap, tap } from 'r
import { Executor } from '../executor';
import { createExecutionContainer, ExecutionContainer } from './container';
import { createError } from '../util';
import { abortSignalToPromise, now } from '../../../kibana_utils/common';
import { now, AbortError } from '../../../kibana_utils/common';
import { Adapters } from '../../../inspector/common';
import { isExpressionValueError, ExpressionValueError } from '../expression_types/specs/error';
import {
@ -50,13 +49,6 @@ type UnwrapReturnType<Function extends (...args: any[]) => unknown> =
? UnwrapObservable<ReturnType<Function>>
: UnwrapPromiseOrReturn<ReturnType<Function>>;
// type ArgumentsOf<Function extends ExpressionFunction> = Function extends ExpressionFunction<
// unknown,
// infer Arguments
// >
// ? Arguments
// : never;
/**
* The result returned after an expression function execution.
*/
@ -95,6 +87,51 @@ const createAbortErrorValue = () =>
name: 'AbortError',
});
function markPartial<T>() {
return (source: Observable<T>) =>
new Observable<ExecutionResult<T>>((subscriber) => {
let latest: ExecutionResult<T> | undefined;
subscriber.add(
source.subscribe({
next: (result) => {
latest = { result, partial: true };
subscriber.next(latest);
},
error: (error) => subscriber.error(error),
complete: () => {
if (latest) {
latest.partial = false;
}
subscriber.complete();
},
})
);
subscriber.add(() => {
latest = undefined;
});
});
}
function takeUntilAborted<T>(signal: AbortSignal) {
return (source: Observable<T>) =>
new Observable<T>((subscriber) => {
const throwAbortError = () => {
subscriber.error(new AbortError());
};
subscriber.add(source.subscribe(subscriber));
subscriber.add(() => signal.removeEventListener('abort', throwAbortError));
signal.addEventListener('abort', throwAbortError);
if (signal.aborted) {
throwAbortError();
}
});
}
export interface ExecutionParams {
executor: Executor;
ast?: ExpressionAstExpression;
@ -138,18 +175,6 @@ export class Execution<
*/
private readonly abortController = getNewAbortController();
/**
* Promise that rejects if/when abort controller sends "abort" signal.
*/
private readonly abortRejection = abortSignalToPromise(this.abortController.signal);
/**
* Races a given observable against the "abort" event of `abortController`.
*/
private race<T>(observable: Observable<T>): Observable<T> {
return race(from(this.abortRejection.promise), observable);
}
/**
* Whether .start() method has been called.
*/
@ -221,32 +246,9 @@ export class Execution<
this.result = this.input$.pipe(
switchMap((input) =>
this.race(this.invokeChain<Output>(this.state.get().ast.chain, input)).pipe(
(source) =>
new Observable<ExecutionResult<Output>>((subscriber) => {
let latest: ExecutionResult<Output> | undefined;
subscriber.add(
source.subscribe({
next: (result) => {
latest = { result, partial: true };
subscriber.next(latest);
},
error: (error) => subscriber.error(error),
complete: () => {
if (latest) {
latest.partial = false;
}
subscriber.complete();
},
})
);
subscriber.add(() => {
latest = undefined;
});
})
this.invokeChain<Output>(this.state.get().ast.chain, input).pipe(
takeUntilAborted(this.abortController.signal),
markPartial()
)
),
catchError((error) => {
@ -265,7 +267,6 @@ export class Execution<
},
error: (error) => this.state.transitions.setError(error),
}),
finalize(() => this.abortRejection.cleanup()),
shareReplay(1)
);
}
@ -356,9 +357,9 @@ export class Execution<
// `resolveArgs` returns an object because the arguments themselves might
// actually have `then` or `subscribe` methods which would be treated as a `Promise`
// or an `Observable` accordingly.
return this.race(this.resolveArgs(fn, currentInput, fnArgs)).pipe(
return this.resolveArgs(fn, currentInput, fnArgs).pipe(
tap((args) => this.execution.params.debug && Object.assign(link.debug, { args })),
switchMap((args) => this.race(this.invokeFunction(fn, currentInput, args))),
switchMap((args) => this.invokeFunction(fn, currentInput, args)),
switchMap((output) => (getType(output) === 'error' ? throwError(output) : of(output))),
tap((output) => this.execution.params.debug && Object.assign(link.debug, { output })),
catchError((rawError) => {
@ -390,7 +391,7 @@ export class Execution<
): Observable<UnwrapReturnType<Fn['fn']>> {
return of(input).pipe(
map((currentInput) => this.cast(currentInput, fn.inputTypes)),
switchMap((normalizedInput) => this.race(of(fn.fn(normalizedInput, args, this.context)))),
switchMap((normalizedInput) => of(fn.fn(normalizedInput, args, this.context))),
switchMap(
(fnResult) =>
(isObservable(fnResult)