[Sessions] Extract search abort controllers logic into a separate class (#95688)

* simplify abort controller logic and extract it into a class

* docs

* delete unused tests

* code review

* code review

* code review
This commit is contained in:
Liza Katz 2021-03-30 15:24:30 +03:00 committed by GitHub
parent 907b5c860d
commit b58dd3efe8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 216 additions and 228 deletions

View file

@ -7,7 +7,7 @@
<b>Signature:</b>
```typescript
protected handleSearchError(e: KibanaServerError | AbortError, timeoutSignal: AbortSignal, options?: ISearchOptions): Error;
protected handleSearchError(e: KibanaServerError | AbortError, options?: ISearchOptions, isTimeout?: boolean): Error;
```
## Parameters
@ -15,8 +15,8 @@ protected handleSearchError(e: KibanaServerError | AbortError, timeoutSignal: Ab
| Parameter | Type | Description |
| --- | --- | --- |
| e | <code>KibanaServerError &#124; AbortError</code> | |
| timeoutSignal | <code>AbortSignal</code> | |
| options | <code>ISearchOptions</code> | |
| isTimeout | <code>boolean</code> | |
<b>Returns:</b>

View file

@ -27,7 +27,7 @@ export declare class SearchInterceptor
| Method | Modifiers | Description |
| --- | --- | --- |
| [getTimeoutMode()](./kibana-plugin-plugins-data-public.searchinterceptor.gettimeoutmode.md) | | |
| [handleSearchError(e, timeoutSignal, options)](./kibana-plugin-plugins-data-public.searchinterceptor.handlesearcherror.md) | | |
| [handleSearchError(e, options, isTimeout)](./kibana-plugin-plugins-data-public.searchinterceptor.handlesearcherror.md) | | |
| [search(request, options)](./kibana-plugin-plugins-data-public.searchinterceptor.search.md) | | Searches using the given <code>search</code> method. Overrides the <code>AbortSignal</code> with one that will abort either when the request times out, or when the original <code>AbortSignal</code> is aborted. Updates <code>pendingCount$</code> when the request is started/finalized. |
| [showError(e)](./kibana-plugin-plugins-data-public.searchinterceptor.showerror.md) | | |

View file

@ -2322,8 +2322,6 @@ export interface SearchError {
// @public (undocumented)
export class SearchInterceptor {
constructor(deps: SearchInterceptorDeps);
// @internal
protected abortController: AbortController;
// @internal (undocumented)
protected application: CoreStart['application'];
// (undocumented)
@ -2334,22 +2332,12 @@ export class SearchInterceptor {
// Warning: (ae-forgotten-export) The symbol "AbortError" needs to be exported by the entry point index.d.ts
//
// (undocumented)
protected handleSearchError(e: KibanaServerError | AbortError, timeoutSignal: AbortSignal, options?: ISearchOptions): Error;
protected handleSearchError(e: KibanaServerError | AbortError, options?: ISearchOptions, isTimeout?: boolean): Error;
// @internal
protected pendingCount$: BehaviorSubject<number>;
// @internal (undocumented)
protected runSearch(request: IKibanaSearchRequest, options?: ISearchOptions): Promise<IKibanaSearchResponse>;
search(request: IKibanaSearchRequest, options?: ISearchOptions): Observable<IKibanaSearchResponse>;
// @internal (undocumented)
protected setupAbortSignal({ abortSignal, timeout, }: {
abortSignal?: AbortSignal;
timeout?: number;
}): {
timeoutSignal: AbortSignal;
combinedSignal: AbortSignal;
cleanup: () => void;
abort: () => void;
};
// (undocumented)
showError(e: Error): void;
}

View file

@ -7,7 +7,7 @@
*/
import { memoize } from 'lodash';
import { BehaviorSubject, throwError, timer, defer, from, Observable, NEVER } from 'rxjs';
import { BehaviorSubject, throwError, defer, from, Observable } from 'rxjs';
import { catchError, finalize } from 'rxjs/operators';
import { PublicMethodsOf } from '@kbn/utility-types';
import { CoreStart, CoreSetup, ToastsSetup } from 'kibana/public';
@ -30,11 +30,7 @@ import {
getHttpError,
} from './errors';
import { toMountPoint } from '../../../kibana_react/public';
import {
AbortError,
getCombinedAbortSignal,
KibanaServerError,
} from '../../../kibana_utils/public';
import { AbortError, KibanaServerError } from '../../../kibana_utils/public';
import { ISessionService } from './session';
export interface SearchInterceptorDeps {
@ -48,12 +44,6 @@ export interface SearchInterceptorDeps {
}
export class SearchInterceptor {
/**
* `abortController` used to signal all searches to abort.
* @internal
*/
protected abortController = new AbortController();
/**
* Observable that emits when the number of pending requests changes.
* @internal
@ -98,10 +88,10 @@ export class SearchInterceptor {
*/
protected handleSearchError(
e: KibanaServerError | AbortError,
timeoutSignal: AbortSignal,
options?: ISearchOptions
options?: ISearchOptions,
isTimeout?: boolean
): Error {
if (timeoutSignal.aborted || e.message === 'Request timed out') {
if (isTimeout || e.message === 'Request timed out') {
// Handle a client or a server side timeout
const err = new SearchTimeoutError(e, this.getTimeoutMode());
@ -154,60 +144,6 @@ export class SearchInterceptor {
);
}
/**
* @internal
*/
protected setupAbortSignal({
abortSignal,
timeout,
}: {
abortSignal?: AbortSignal;
timeout?: number;
}) {
// Schedule this request to automatically timeout after some interval
const timeoutController = new AbortController();
const { signal: timeoutSignal } = timeoutController;
const timeout$ = timeout ? timer(timeout) : NEVER;
const subscription = timeout$.subscribe(() => {
this.deps.usageCollector?.trackQueryTimedOut();
timeoutController.abort();
});
const selfAbortController = new AbortController();
// Get a combined `AbortSignal` that will be aborted whenever the first of the following occurs:
// 1. The internal abort controller aborts
// 2. The request times out
// 3. abort() is called on `selfAbortController`. This is used by session service to abort all pending searches that it tracks
// in the current session
// 4. The passed-in signal aborts (e.g. when re-fetching, or whenever the app determines)
const signals = [
this.abortController.signal,
timeoutSignal,
selfAbortController.signal,
...(abortSignal ? [abortSignal] : []),
];
const { signal: combinedSignal, cleanup: cleanupCombinedSignal } = getCombinedAbortSignal(
signals
);
const cleanup = () => {
subscription.unsubscribe();
combinedSignal.removeEventListener('abort', cleanup);
cleanupCombinedSignal();
};
combinedSignal.addEventListener('abort', cleanup);
return {
timeoutSignal,
combinedSignal,
cleanup,
abort: () => {
selfAbortController.abort();
},
};
}
private showTimeoutErrorToast = (e: SearchTimeoutError, sessionId?: string) => {
this.deps.toasts.addDanger({
title: 'Timed out',
@ -245,25 +181,21 @@ export class SearchInterceptor {
*/
public search(
request: IKibanaSearchRequest,
options?: ISearchOptions
options: ISearchOptions = {}
): Observable<IKibanaSearchResponse> {
// Defer the following logic until `subscribe` is actually called
return defer(() => {
if (options?.abortSignal?.aborted) {
if (options.abortSignal?.aborted) {
return throwError(new AbortError());
}
const { timeoutSignal, combinedSignal, cleanup } = this.setupAbortSignal({
abortSignal: options?.abortSignal,
});
this.pendingCount$.next(this.pendingCount$.getValue() + 1);
return from(this.runSearch(request, { ...options, abortSignal: combinedSignal })).pipe(
return from(this.runSearch(request, options)).pipe(
catchError((e: Error | AbortError) => {
return throwError(this.handleSearchError(e, timeoutSignal, options));
return throwError(this.handleSearchError(e, options));
}),
finalize(() => {
this.pendingCount$.next(this.pendingCount$.getValue() - 1);
cleanup();
})
);
});

View file

@ -6,7 +6,7 @@
* Side Public License, v 1.
*/
import { AbortError, abortSignalToPromise, getCombinedAbortSignal } from './abort_utils';
import { AbortError, abortSignalToPromise } from './abort_utils';
jest.useFakeTimers();
@ -66,91 +66,4 @@ describe('AbortUtils', () => {
});
});
});
describe('getCombinedAbortSignal', () => {
test('should return an AbortSignal', () => {
const signal = getCombinedAbortSignal([]).signal;
expect(signal).toBeInstanceOf(AbortSignal);
});
test('should not abort if none of the signals abort', async () => {
const controller1 = new AbortController();
const controller2 = new AbortController();
setTimeout(() => controller1.abort(), 2000);
setTimeout(() => controller2.abort(), 1000);
const signal = getCombinedAbortSignal([controller1.signal, controller2.signal]).signal;
expect(signal.aborted).toBe(false);
jest.advanceTimersByTime(500);
await flushPromises();
expect(signal.aborted).toBe(false);
});
test('should abort when the first signal aborts', async () => {
const controller1 = new AbortController();
const controller2 = new AbortController();
setTimeout(() => controller1.abort(), 2000);
setTimeout(() => controller2.abort(), 1000);
const signal = getCombinedAbortSignal([controller1.signal, controller2.signal]).signal;
expect(signal.aborted).toBe(false);
jest.advanceTimersByTime(1000);
await flushPromises();
expect(signal.aborted).toBe(true);
});
test('should be aborted if any of the signals is already aborted', async () => {
const controller1 = new AbortController();
const controller2 = new AbortController();
controller1.abort();
const signal = getCombinedAbortSignal([controller1.signal, controller2.signal]).signal;
expect(signal.aborted).toBe(true);
});
describe('cleanup listener', () => {
const createMockController = () => {
const controller = new AbortController();
const spyAddListener = jest.spyOn(controller.signal, 'addEventListener');
const spyRemoveListener = jest.spyOn(controller.signal, 'removeEventListener');
return {
controller,
getTotalListeners: () =>
Math.max(spyAddListener.mock.calls.length - spyRemoveListener.mock.calls.length, 0),
};
};
test('cleanup should cleanup inner listeners', () => {
const controller1 = createMockController();
const controller2 = createMockController();
const { cleanup } = getCombinedAbortSignal([
controller1.controller.signal,
controller2.controller.signal,
]);
expect(controller1.getTotalListeners()).toBe(1);
expect(controller2.getTotalListeners()).toBe(1);
cleanup();
expect(controller1.getTotalListeners()).toBe(0);
expect(controller2.getTotalListeners()).toBe(0);
});
test('abort should cleanup inner listeners', async () => {
const controller1 = createMockController();
const controller2 = createMockController();
getCombinedAbortSignal([controller1.controller.signal, controller2.controller.signal]);
expect(controller1.getTotalListeners()).toBe(1);
expect(controller2.getTotalListeners()).toBe(1);
controller1.controller.abort();
await flushPromises();
expect(controller1.getTotalListeners()).toBe(0);
expect(controller2.getTotalListeners()).toBe(0);
});
});
});
});

View file

@ -45,32 +45,3 @@ export function abortSignalToPromise(
return { promise, cleanup };
}
/**
* Returns an `AbortSignal` that will be aborted when the first of the given signals aborts.
*
* @param signals
*/
export function getCombinedAbortSignal(
signals: AbortSignal[]
): { signal: AbortSignal; cleanup: () => void } {
const controller = new AbortController();
let cleanup = () => {};
if (signals.some((signal) => signal.aborted)) {
controller.abort();
} else {
const promises = signals.map((signal) => abortSignalToPromise(signal));
cleanup = () => {
promises.forEach((p) => p.cleanup());
controller.signal.removeEventListener('abort', cleanup);
};
controller.signal.addEventListener('abort', cleanup);
Promise.race(promises.map((p) => p.promise)).catch(() => {
cleanup();
controller.abort();
});
}
return { signal: controller.signal, cleanup };
}

View file

@ -13,7 +13,7 @@ export * from './ui';
export * from './state_containers';
export * from './typed_json';
export * from './errors';
export { AbortError, abortSignalToPromise, getCombinedAbortSignal } from './abort_utils';
export { AbortError, abortSignalToPromise } from './abort_utils';
export { createGetterSetter, Get, Set } from './create_getter_setter';
export { distinctUntilChangedWithInitialValue } from './distinct_until_changed_with_initial_value';
export { url } from './url';

View file

@ -15,7 +15,6 @@ export {
fieldWildcardFilter,
fieldWildcardMatcher,
Get,
getCombinedAbortSignal,
JsonArray,
JsonObject,
JsonValue,

View file

@ -13,7 +13,6 @@ export {
fieldWildcardFilter,
fieldWildcardMatcher,
Get,
getCombinedAbortSignal,
Set,
url,
} from '../common';

View file

@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SearchAbortController } from './search_abort_controller';
const timeTravel = (msToRun = 0) => {
jest.advanceTimersByTime(msToRun);
return new Promise((resolve) => setImmediate(resolve));
};
describe('search abort controller', () => {
test('is not aborted when empty', () => {
const sac = new SearchAbortController();
expect(sac.getSignal().aborted).toBe(false);
});
test('immediately aborts when passed an aborted signal in the constructor', () => {
const controller = new AbortController();
controller.abort();
const sac = new SearchAbortController(controller.signal);
expect(sac.getSignal().aborted).toBe(true);
});
test('aborts when input signal is aborted', () => {
const controller = new AbortController();
const sac = new SearchAbortController(controller.signal);
expect(sac.getSignal().aborted).toBe(false);
controller.abort();
expect(sac.getSignal().aborted).toBe(true);
});
test('aborts when all input signals are aborted', () => {
const controller = new AbortController();
const sac = new SearchAbortController(controller.signal);
const controller2 = new AbortController();
sac.addAbortSignal(controller2.signal);
expect(sac.getSignal().aborted).toBe(false);
controller.abort();
expect(sac.getSignal().aborted).toBe(false);
controller2.abort();
expect(sac.getSignal().aborted).toBe(true);
});
test('aborts explicitly even if all inputs are not aborted', () => {
const controller = new AbortController();
const sac = new SearchAbortController(controller.signal);
const controller2 = new AbortController();
sac.addAbortSignal(controller2.signal);
expect(sac.getSignal().aborted).toBe(false);
sac.abort();
expect(sac.getSignal().aborted).toBe(true);
});
test('doesnt abort, if cleared', () => {
const controller = new AbortController();
const sac = new SearchAbortController(controller.signal);
expect(sac.getSignal().aborted).toBe(false);
sac.cleanup();
controller.abort();
expect(sac.getSignal().aborted).toBe(false);
});
describe('timeout abort', () => {
beforeEach(() => {
jest.useFakeTimers();
});
afterEach(() => {
jest.useRealTimers();
});
test('doesnt abort on timeout, if cleared', () => {
const sac = new SearchAbortController(undefined, 100);
expect(sac.getSignal().aborted).toBe(false);
sac.cleanup();
timeTravel(100);
expect(sac.getSignal().aborted).toBe(false);
});
test('aborts on timeout, even if no signals passed in', () => {
const sac = new SearchAbortController(undefined, 100);
expect(sac.getSignal().aborted).toBe(false);
timeTravel(100);
expect(sac.getSignal().aborted).toBe(true);
expect(sac.isTimeout()).toBe(true);
});
test('aborts on timeout, even if there are unaborted signals', () => {
const controller = new AbortController();
const sac = new SearchAbortController(controller.signal, 100);
expect(sac.getSignal().aborted).toBe(false);
timeTravel(100);
expect(sac.getSignal().aborted).toBe(true);
expect(sac.isTimeout()).toBe(true);
});
});
});

View file

@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Subscription, timer } from 'rxjs';
export enum AbortReason {
Timeout = 'timeout',
}
export class SearchAbortController {
private inputAbortSignals: AbortSignal[] = new Array();
private abortController: AbortController = new AbortController();
private timeoutSub?: Subscription;
private destroyed = false;
private reason?: AbortReason;
constructor(abortSignal?: AbortSignal, timeout?: number) {
if (abortSignal) {
this.addAbortSignal(abortSignal);
}
if (timeout) {
this.timeoutSub = timer(timeout).subscribe(() => {
this.reason = AbortReason.Timeout;
this.abortController.abort();
this.timeoutSub!.unsubscribe();
});
}
}
private abortHandler = () => {
const allAborted = this.inputAbortSignals.every((signal) => signal.aborted);
if (allAborted) {
this.abortController.abort();
this.cleanup();
}
};
public cleanup() {
this.destroyed = true;
this.timeoutSub?.unsubscribe();
this.inputAbortSignals.forEach((abortSignal) => {
abortSignal.removeEventListener('abort', this.abortHandler);
});
}
public addAbortSignal(inputSignal: AbortSignal) {
if (this.destroyed) {
return;
}
this.inputAbortSignals.push(inputSignal);
if (inputSignal.aborted) {
this.abortHandler();
} else {
// abort our internal controller if the input signal aborts
inputSignal.addEventListener('abort', this.abortHandler);
}
}
public getSignal() {
return this.abortController.signal;
}
public abort() {
this.cleanup();
this.abortController.abort();
}
public isTimeout() {
return this.reason === AbortReason.Timeout;
}
}

View file

@ -17,6 +17,7 @@ import {
SearchSessionState,
} from '../../../../../src/plugins/data/public';
import { ENHANCED_ES_SEARCH_STRATEGY, IAsyncSearchOptions, pollSearch } from '../../common';
import { SearchAbortController } from './search_abort_controller';
export class EnhancedSearchInterceptor extends SearchInterceptor {
private uiSettingsSub: Subscription;
@ -47,31 +48,30 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
}
public search({ id, ...request }: IKibanaSearchRequest, options: IAsyncSearchOptions = {}) {
const { combinedSignal, timeoutSignal, cleanup, abort } = this.setupAbortSignal({
abortSignal: options.abortSignal,
timeout: this.searchTimeout,
});
const strategy = options?.strategy ?? ENHANCED_ES_SEARCH_STRATEGY;
const searchOptions = { ...options, strategy, abortSignal: combinedSignal };
const searchOptions = {
strategy: ENHANCED_ES_SEARCH_STRATEGY,
...options,
};
const { sessionId, strategy, abortSignal } = searchOptions;
const search = () => this.runSearch({ id, ...request }, searchOptions);
const searchAbortController = new SearchAbortController(abortSignal, this.searchTimeout);
this.pendingCount$.next(this.pendingCount$.getValue() + 1);
const untrackSearch =
this.deps.session.isCurrentSession(options.sessionId) &&
this.deps.session.trackSearch({ abort });
const untrackSearch = this.deps.session.isCurrentSession(options.sessionId)
? this.deps.session.trackSearch({ abort: () => searchAbortController.abort() })
: undefined;
// track if this search's session will be send to background
// if yes, then we don't need to cancel this search when it is aborted
let isSavedToBackground = false;
const savedToBackgroundSub =
this.deps.session.isCurrentSession(options.sessionId) &&
this.deps.session.isCurrentSession(sessionId) &&
this.deps.session.state$
.pipe(
skip(1), // ignore any state, we are only interested in transition x -> BackgroundLoading
filter(
(state) =>
this.deps.session.isCurrentSession(options.sessionId) &&
this.deps.session.isCurrentSession(sessionId) &&
state === SearchSessionState.BackgroundLoading
),
take(1)
@ -84,15 +84,18 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
if (id && !isSavedToBackground) this.deps.http.delete(`/internal/search/${strategy}/${id}`);
});
return pollSearch(search, cancel, { ...options, abortSignal: combinedSignal }).pipe(
return pollSearch(search, cancel, {
...options,
abortSignal: searchAbortController.getSignal(),
}).pipe(
tap((response) => (id = response.id)),
catchError((e: Error) => {
cancel();
return throwError(this.handleSearchError(e, timeoutSignal, options));
return throwError(this.handleSearchError(e, options, searchAbortController.isTimeout()));
}),
finalize(() => {
this.pendingCount$.next(this.pendingCount$.getValue() - 1);
cleanup();
searchAbortController.cleanup();
if (untrackSearch && this.deps.session.isCurrentSession(options.sessionId)) {
// untrack if this search still belongs to current session
untrackSearch();