mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
* simplify abort controller logic and extract it into a class * docs * delete unused tests * code review * code review * code review Co-authored-by: Liza Katz <lizka.k@gmail.com>
This commit is contained in:
parent
41eb6e2b12
commit
8f5fd0fd63
12 changed files with 216 additions and 228 deletions
|
@ -7,7 +7,7 @@
|
|||
<b>Signature:</b>
|
||||
|
||||
```typescript
|
||||
protected handleSearchError(e: KibanaServerError | AbortError, timeoutSignal: AbortSignal, options?: ISearchOptions): Error;
|
||||
protected handleSearchError(e: KibanaServerError | AbortError, options?: ISearchOptions, isTimeout?: boolean): Error;
|
||||
```
|
||||
|
||||
## Parameters
|
||||
|
@ -15,8 +15,8 @@ protected handleSearchError(e: KibanaServerError | AbortError, timeoutSignal: Ab
|
|||
| Parameter | Type | Description |
|
||||
| --- | --- | --- |
|
||||
| e | <code>KibanaServerError | AbortError</code> | |
|
||||
| timeoutSignal | <code>AbortSignal</code> | |
|
||||
| options | <code>ISearchOptions</code> | |
|
||||
| isTimeout | <code>boolean</code> | |
|
||||
|
||||
<b>Returns:</b>
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ export declare class SearchInterceptor
|
|||
| Method | Modifiers | Description |
|
||||
| --- | --- | --- |
|
||||
| [getTimeoutMode()](./kibana-plugin-plugins-data-public.searchinterceptor.gettimeoutmode.md) | | |
|
||||
| [handleSearchError(e, timeoutSignal, options)](./kibana-plugin-plugins-data-public.searchinterceptor.handlesearcherror.md) | | |
|
||||
| [handleSearchError(e, options, isTimeout)](./kibana-plugin-plugins-data-public.searchinterceptor.handlesearcherror.md) | | |
|
||||
| [search(request, options)](./kibana-plugin-plugins-data-public.searchinterceptor.search.md) | | Searches using the given <code>search</code> method. Overrides the <code>AbortSignal</code> with one that will abort either when the request times out, or when the original <code>AbortSignal</code> is aborted. Updates <code>pendingCount$</code> when the request is started/finalized. |
|
||||
| [showError(e)](./kibana-plugin-plugins-data-public.searchinterceptor.showerror.md) | | |
|
||||
|
||||
|
|
|
@ -2330,8 +2330,6 @@ export interface SearchError {
|
|||
// @public (undocumented)
|
||||
export class SearchInterceptor {
|
||||
constructor(deps: SearchInterceptorDeps);
|
||||
// @internal
|
||||
protected abortController: AbortController;
|
||||
// @internal (undocumented)
|
||||
protected application: CoreStart['application'];
|
||||
// (undocumented)
|
||||
|
@ -2342,22 +2340,12 @@ export class SearchInterceptor {
|
|||
// Warning: (ae-forgotten-export) The symbol "AbortError" needs to be exported by the entry point index.d.ts
|
||||
//
|
||||
// (undocumented)
|
||||
protected handleSearchError(e: KibanaServerError | AbortError, timeoutSignal: AbortSignal, options?: ISearchOptions): Error;
|
||||
protected handleSearchError(e: KibanaServerError | AbortError, options?: ISearchOptions, isTimeout?: boolean): Error;
|
||||
// @internal
|
||||
protected pendingCount$: BehaviorSubject<number>;
|
||||
// @internal (undocumented)
|
||||
protected runSearch(request: IKibanaSearchRequest, options?: ISearchOptions): Promise<IKibanaSearchResponse>;
|
||||
search(request: IKibanaSearchRequest, options?: ISearchOptions): Observable<IKibanaSearchResponse>;
|
||||
// @internal (undocumented)
|
||||
protected setupAbortSignal({ abortSignal, timeout, }: {
|
||||
abortSignal?: AbortSignal;
|
||||
timeout?: number;
|
||||
}): {
|
||||
timeoutSignal: AbortSignal;
|
||||
combinedSignal: AbortSignal;
|
||||
cleanup: () => void;
|
||||
abort: () => void;
|
||||
};
|
||||
// (undocumented)
|
||||
showError(e: Error): void;
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
*/
|
||||
|
||||
import { memoize } from 'lodash';
|
||||
import { BehaviorSubject, throwError, timer, defer, from, Observable, NEVER } from 'rxjs';
|
||||
import { BehaviorSubject, throwError, defer, from, Observable } from 'rxjs';
|
||||
import { catchError, finalize } from 'rxjs/operators';
|
||||
import { PublicMethodsOf } from '@kbn/utility-types';
|
||||
import { CoreStart, CoreSetup, ToastsSetup } from 'kibana/public';
|
||||
|
@ -30,11 +30,7 @@ import {
|
|||
getHttpError,
|
||||
} from './errors';
|
||||
import { toMountPoint } from '../../../kibana_react/public';
|
||||
import {
|
||||
AbortError,
|
||||
getCombinedAbortSignal,
|
||||
KibanaServerError,
|
||||
} from '../../../kibana_utils/public';
|
||||
import { AbortError, KibanaServerError } from '../../../kibana_utils/public';
|
||||
import { ISessionService } from './session';
|
||||
|
||||
export interface SearchInterceptorDeps {
|
||||
|
@ -48,12 +44,6 @@ export interface SearchInterceptorDeps {
|
|||
}
|
||||
|
||||
export class SearchInterceptor {
|
||||
/**
|
||||
* `abortController` used to signal all searches to abort.
|
||||
* @internal
|
||||
*/
|
||||
protected abortController = new AbortController();
|
||||
|
||||
/**
|
||||
* Observable that emits when the number of pending requests changes.
|
||||
* @internal
|
||||
|
@ -98,10 +88,10 @@ export class SearchInterceptor {
|
|||
*/
|
||||
protected handleSearchError(
|
||||
e: KibanaServerError | AbortError,
|
||||
timeoutSignal: AbortSignal,
|
||||
options?: ISearchOptions
|
||||
options?: ISearchOptions,
|
||||
isTimeout?: boolean
|
||||
): Error {
|
||||
if (timeoutSignal.aborted || e.message === 'Request timed out') {
|
||||
if (isTimeout || e.message === 'Request timed out') {
|
||||
// Handle a client or a server side timeout
|
||||
const err = new SearchTimeoutError(e, this.getTimeoutMode());
|
||||
|
||||
|
@ -154,60 +144,6 @@ export class SearchInterceptor {
|
|||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
protected setupAbortSignal({
|
||||
abortSignal,
|
||||
timeout,
|
||||
}: {
|
||||
abortSignal?: AbortSignal;
|
||||
timeout?: number;
|
||||
}) {
|
||||
// Schedule this request to automatically timeout after some interval
|
||||
const timeoutController = new AbortController();
|
||||
const { signal: timeoutSignal } = timeoutController;
|
||||
const timeout$ = timeout ? timer(timeout) : NEVER;
|
||||
const subscription = timeout$.subscribe(() => {
|
||||
this.deps.usageCollector?.trackQueryTimedOut();
|
||||
timeoutController.abort();
|
||||
});
|
||||
|
||||
const selfAbortController = new AbortController();
|
||||
|
||||
// Get a combined `AbortSignal` that will be aborted whenever the first of the following occurs:
|
||||
// 1. The internal abort controller aborts
|
||||
// 2. The request times out
|
||||
// 3. abort() is called on `selfAbortController`. This is used by session service to abort all pending searches that it tracks
|
||||
// in the current session
|
||||
// 4. The passed-in signal aborts (e.g. when re-fetching, or whenever the app determines)
|
||||
const signals = [
|
||||
this.abortController.signal,
|
||||
timeoutSignal,
|
||||
selfAbortController.signal,
|
||||
...(abortSignal ? [abortSignal] : []),
|
||||
];
|
||||
|
||||
const { signal: combinedSignal, cleanup: cleanupCombinedSignal } = getCombinedAbortSignal(
|
||||
signals
|
||||
);
|
||||
const cleanup = () => {
|
||||
subscription.unsubscribe();
|
||||
combinedSignal.removeEventListener('abort', cleanup);
|
||||
cleanupCombinedSignal();
|
||||
};
|
||||
combinedSignal.addEventListener('abort', cleanup);
|
||||
|
||||
return {
|
||||
timeoutSignal,
|
||||
combinedSignal,
|
||||
cleanup,
|
||||
abort: () => {
|
||||
selfAbortController.abort();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private showTimeoutErrorToast = (e: SearchTimeoutError, sessionId?: string) => {
|
||||
this.deps.toasts.addDanger({
|
||||
title: 'Timed out',
|
||||
|
@ -245,25 +181,21 @@ export class SearchInterceptor {
|
|||
*/
|
||||
public search(
|
||||
request: IKibanaSearchRequest,
|
||||
options?: ISearchOptions
|
||||
options: ISearchOptions = {}
|
||||
): Observable<IKibanaSearchResponse> {
|
||||
// Defer the following logic until `subscribe` is actually called
|
||||
return defer(() => {
|
||||
if (options?.abortSignal?.aborted) {
|
||||
if (options.abortSignal?.aborted) {
|
||||
return throwError(new AbortError());
|
||||
}
|
||||
|
||||
const { timeoutSignal, combinedSignal, cleanup } = this.setupAbortSignal({
|
||||
abortSignal: options?.abortSignal,
|
||||
});
|
||||
this.pendingCount$.next(this.pendingCount$.getValue() + 1);
|
||||
return from(this.runSearch(request, { ...options, abortSignal: combinedSignal })).pipe(
|
||||
return from(this.runSearch(request, options)).pipe(
|
||||
catchError((e: Error | AbortError) => {
|
||||
return throwError(this.handleSearchError(e, timeoutSignal, options));
|
||||
return throwError(this.handleSearchError(e, options));
|
||||
}),
|
||||
finalize(() => {
|
||||
this.pendingCount$.next(this.pendingCount$.getValue() - 1);
|
||||
cleanup();
|
||||
})
|
||||
);
|
||||
});
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { AbortError, abortSignalToPromise, getCombinedAbortSignal } from './abort_utils';
|
||||
import { AbortError, abortSignalToPromise } from './abort_utils';
|
||||
|
||||
jest.useFakeTimers();
|
||||
|
||||
|
@ -66,91 +66,4 @@ describe('AbortUtils', () => {
|
|||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('getCombinedAbortSignal', () => {
|
||||
test('should return an AbortSignal', () => {
|
||||
const signal = getCombinedAbortSignal([]).signal;
|
||||
expect(signal).toBeInstanceOf(AbortSignal);
|
||||
});
|
||||
|
||||
test('should not abort if none of the signals abort', async () => {
|
||||
const controller1 = new AbortController();
|
||||
const controller2 = new AbortController();
|
||||
setTimeout(() => controller1.abort(), 2000);
|
||||
setTimeout(() => controller2.abort(), 1000);
|
||||
const signal = getCombinedAbortSignal([controller1.signal, controller2.signal]).signal;
|
||||
expect(signal.aborted).toBe(false);
|
||||
jest.advanceTimersByTime(500);
|
||||
await flushPromises();
|
||||
expect(signal.aborted).toBe(false);
|
||||
});
|
||||
|
||||
test('should abort when the first signal aborts', async () => {
|
||||
const controller1 = new AbortController();
|
||||
const controller2 = new AbortController();
|
||||
setTimeout(() => controller1.abort(), 2000);
|
||||
setTimeout(() => controller2.abort(), 1000);
|
||||
const signal = getCombinedAbortSignal([controller1.signal, controller2.signal]).signal;
|
||||
expect(signal.aborted).toBe(false);
|
||||
jest.advanceTimersByTime(1000);
|
||||
await flushPromises();
|
||||
expect(signal.aborted).toBe(true);
|
||||
});
|
||||
|
||||
test('should be aborted if any of the signals is already aborted', async () => {
|
||||
const controller1 = new AbortController();
|
||||
const controller2 = new AbortController();
|
||||
controller1.abort();
|
||||
const signal = getCombinedAbortSignal([controller1.signal, controller2.signal]).signal;
|
||||
expect(signal.aborted).toBe(true);
|
||||
});
|
||||
|
||||
describe('cleanup listener', () => {
|
||||
const createMockController = () => {
|
||||
const controller = new AbortController();
|
||||
const spyAddListener = jest.spyOn(controller.signal, 'addEventListener');
|
||||
const spyRemoveListener = jest.spyOn(controller.signal, 'removeEventListener');
|
||||
return {
|
||||
controller,
|
||||
getTotalListeners: () =>
|
||||
Math.max(spyAddListener.mock.calls.length - spyRemoveListener.mock.calls.length, 0),
|
||||
};
|
||||
};
|
||||
|
||||
test('cleanup should cleanup inner listeners', () => {
|
||||
const controller1 = createMockController();
|
||||
const controller2 = createMockController();
|
||||
|
||||
const { cleanup } = getCombinedAbortSignal([
|
||||
controller1.controller.signal,
|
||||
controller2.controller.signal,
|
||||
]);
|
||||
|
||||
expect(controller1.getTotalListeners()).toBe(1);
|
||||
expect(controller2.getTotalListeners()).toBe(1);
|
||||
|
||||
cleanup();
|
||||
|
||||
expect(controller1.getTotalListeners()).toBe(0);
|
||||
expect(controller2.getTotalListeners()).toBe(0);
|
||||
});
|
||||
|
||||
test('abort should cleanup inner listeners', async () => {
|
||||
const controller1 = createMockController();
|
||||
const controller2 = createMockController();
|
||||
|
||||
getCombinedAbortSignal([controller1.controller.signal, controller2.controller.signal]);
|
||||
|
||||
expect(controller1.getTotalListeners()).toBe(1);
|
||||
expect(controller2.getTotalListeners()).toBe(1);
|
||||
|
||||
controller1.controller.abort();
|
||||
|
||||
await flushPromises();
|
||||
|
||||
expect(controller1.getTotalListeners()).toBe(0);
|
||||
expect(controller2.getTotalListeners()).toBe(0);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -45,32 +45,3 @@ export function abortSignalToPromise(
|
|||
|
||||
return { promise, cleanup };
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an `AbortSignal` that will be aborted when the first of the given signals aborts.
|
||||
*
|
||||
* @param signals
|
||||
*/
|
||||
export function getCombinedAbortSignal(
|
||||
signals: AbortSignal[]
|
||||
): { signal: AbortSignal; cleanup: () => void } {
|
||||
const controller = new AbortController();
|
||||
let cleanup = () => {};
|
||||
|
||||
if (signals.some((signal) => signal.aborted)) {
|
||||
controller.abort();
|
||||
} else {
|
||||
const promises = signals.map((signal) => abortSignalToPromise(signal));
|
||||
cleanup = () => {
|
||||
promises.forEach((p) => p.cleanup());
|
||||
controller.signal.removeEventListener('abort', cleanup);
|
||||
};
|
||||
controller.signal.addEventListener('abort', cleanup);
|
||||
Promise.race(promises.map((p) => p.promise)).catch(() => {
|
||||
cleanup();
|
||||
controller.abort();
|
||||
});
|
||||
}
|
||||
|
||||
return { signal: controller.signal, cleanup };
|
||||
}
|
||||
|
|
|
@ -13,7 +13,7 @@ export * from './ui';
|
|||
export * from './state_containers';
|
||||
export * from './typed_json';
|
||||
export * from './errors';
|
||||
export { AbortError, abortSignalToPromise, getCombinedAbortSignal } from './abort_utils';
|
||||
export { AbortError, abortSignalToPromise } from './abort_utils';
|
||||
export { createGetterSetter, Get, Set } from './create_getter_setter';
|
||||
export { distinctUntilChangedWithInitialValue } from './distinct_until_changed_with_initial_value';
|
||||
export { url } from './url';
|
||||
|
|
|
@ -15,7 +15,6 @@ export {
|
|||
fieldWildcardFilter,
|
||||
fieldWildcardMatcher,
|
||||
Get,
|
||||
getCombinedAbortSignal,
|
||||
JsonArray,
|
||||
JsonObject,
|
||||
JsonValue,
|
||||
|
|
|
@ -13,7 +13,6 @@ export {
|
|||
fieldWildcardFilter,
|
||||
fieldWildcardMatcher,
|
||||
Get,
|
||||
getCombinedAbortSignal,
|
||||
Set,
|
||||
url,
|
||||
} from '../common';
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { SearchAbortController } from './search_abort_controller';
|
||||
|
||||
const timeTravel = (msToRun = 0) => {
|
||||
jest.advanceTimersByTime(msToRun);
|
||||
return new Promise((resolve) => setImmediate(resolve));
|
||||
};
|
||||
|
||||
describe('search abort controller', () => {
|
||||
test('is not aborted when empty', () => {
|
||||
const sac = new SearchAbortController();
|
||||
expect(sac.getSignal().aborted).toBe(false);
|
||||
});
|
||||
|
||||
test('immediately aborts when passed an aborted signal in the constructor', () => {
|
||||
const controller = new AbortController();
|
||||
controller.abort();
|
||||
const sac = new SearchAbortController(controller.signal);
|
||||
expect(sac.getSignal().aborted).toBe(true);
|
||||
});
|
||||
|
||||
test('aborts when input signal is aborted', () => {
|
||||
const controller = new AbortController();
|
||||
const sac = new SearchAbortController(controller.signal);
|
||||
expect(sac.getSignal().aborted).toBe(false);
|
||||
controller.abort();
|
||||
expect(sac.getSignal().aborted).toBe(true);
|
||||
});
|
||||
|
||||
test('aborts when all input signals are aborted', () => {
|
||||
const controller = new AbortController();
|
||||
const sac = new SearchAbortController(controller.signal);
|
||||
|
||||
const controller2 = new AbortController();
|
||||
sac.addAbortSignal(controller2.signal);
|
||||
expect(sac.getSignal().aborted).toBe(false);
|
||||
controller.abort();
|
||||
expect(sac.getSignal().aborted).toBe(false);
|
||||
controller2.abort();
|
||||
expect(sac.getSignal().aborted).toBe(true);
|
||||
});
|
||||
|
||||
test('aborts explicitly even if all inputs are not aborted', () => {
|
||||
const controller = new AbortController();
|
||||
const sac = new SearchAbortController(controller.signal);
|
||||
|
||||
const controller2 = new AbortController();
|
||||
sac.addAbortSignal(controller2.signal);
|
||||
|
||||
expect(sac.getSignal().aborted).toBe(false);
|
||||
sac.abort();
|
||||
expect(sac.getSignal().aborted).toBe(true);
|
||||
});
|
||||
|
||||
test('doesnt abort, if cleared', () => {
|
||||
const controller = new AbortController();
|
||||
const sac = new SearchAbortController(controller.signal);
|
||||
expect(sac.getSignal().aborted).toBe(false);
|
||||
sac.cleanup();
|
||||
controller.abort();
|
||||
expect(sac.getSignal().aborted).toBe(false);
|
||||
});
|
||||
|
||||
describe('timeout abort', () => {
|
||||
beforeEach(() => {
|
||||
jest.useFakeTimers();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
||||
test('doesnt abort on timeout, if cleared', () => {
|
||||
const sac = new SearchAbortController(undefined, 100);
|
||||
expect(sac.getSignal().aborted).toBe(false);
|
||||
sac.cleanup();
|
||||
timeTravel(100);
|
||||
expect(sac.getSignal().aborted).toBe(false);
|
||||
});
|
||||
|
||||
test('aborts on timeout, even if no signals passed in', () => {
|
||||
const sac = new SearchAbortController(undefined, 100);
|
||||
expect(sac.getSignal().aborted).toBe(false);
|
||||
timeTravel(100);
|
||||
expect(sac.getSignal().aborted).toBe(true);
|
||||
expect(sac.isTimeout()).toBe(true);
|
||||
});
|
||||
|
||||
test('aborts on timeout, even if there are unaborted signals', () => {
|
||||
const controller = new AbortController();
|
||||
const sac = new SearchAbortController(controller.signal, 100);
|
||||
|
||||
expect(sac.getSignal().aborted).toBe(false);
|
||||
timeTravel(100);
|
||||
expect(sac.getSignal().aborted).toBe(true);
|
||||
expect(sac.isTimeout()).toBe(true);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { Subscription, timer } from 'rxjs';
|
||||
|
||||
export enum AbortReason {
|
||||
Timeout = 'timeout',
|
||||
}
|
||||
|
||||
export class SearchAbortController {
|
||||
private inputAbortSignals: AbortSignal[] = new Array();
|
||||
private abortController: AbortController = new AbortController();
|
||||
private timeoutSub?: Subscription;
|
||||
private destroyed = false;
|
||||
private reason?: AbortReason;
|
||||
|
||||
constructor(abortSignal?: AbortSignal, timeout?: number) {
|
||||
if (abortSignal) {
|
||||
this.addAbortSignal(abortSignal);
|
||||
}
|
||||
|
||||
if (timeout) {
|
||||
this.timeoutSub = timer(timeout).subscribe(() => {
|
||||
this.reason = AbortReason.Timeout;
|
||||
this.abortController.abort();
|
||||
this.timeoutSub!.unsubscribe();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private abortHandler = () => {
|
||||
const allAborted = this.inputAbortSignals.every((signal) => signal.aborted);
|
||||
if (allAborted) {
|
||||
this.abortController.abort();
|
||||
this.cleanup();
|
||||
}
|
||||
};
|
||||
|
||||
public cleanup() {
|
||||
this.destroyed = true;
|
||||
this.timeoutSub?.unsubscribe();
|
||||
this.inputAbortSignals.forEach((abortSignal) => {
|
||||
abortSignal.removeEventListener('abort', this.abortHandler);
|
||||
});
|
||||
}
|
||||
|
||||
public addAbortSignal(inputSignal: AbortSignal) {
|
||||
if (this.destroyed) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.inputAbortSignals.push(inputSignal);
|
||||
|
||||
if (inputSignal.aborted) {
|
||||
this.abortHandler();
|
||||
} else {
|
||||
// abort our internal controller if the input signal aborts
|
||||
inputSignal.addEventListener('abort', this.abortHandler);
|
||||
}
|
||||
}
|
||||
|
||||
public getSignal() {
|
||||
return this.abortController.signal;
|
||||
}
|
||||
|
||||
public abort() {
|
||||
this.cleanup();
|
||||
this.abortController.abort();
|
||||
}
|
||||
|
||||
public isTimeout() {
|
||||
return this.reason === AbortReason.Timeout;
|
||||
}
|
||||
}
|
|
@ -17,6 +17,7 @@ import {
|
|||
SearchSessionState,
|
||||
} from '../../../../../src/plugins/data/public';
|
||||
import { ENHANCED_ES_SEARCH_STRATEGY, IAsyncSearchOptions, pollSearch } from '../../common';
|
||||
import { SearchAbortController } from './search_abort_controller';
|
||||
|
||||
export class EnhancedSearchInterceptor extends SearchInterceptor {
|
||||
private uiSettingsSub: Subscription;
|
||||
|
@ -47,31 +48,30 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
|
|||
}
|
||||
|
||||
public search({ id, ...request }: IKibanaSearchRequest, options: IAsyncSearchOptions = {}) {
|
||||
const { combinedSignal, timeoutSignal, cleanup, abort } = this.setupAbortSignal({
|
||||
abortSignal: options.abortSignal,
|
||||
timeout: this.searchTimeout,
|
||||
});
|
||||
const strategy = options?.strategy ?? ENHANCED_ES_SEARCH_STRATEGY;
|
||||
const searchOptions = { ...options, strategy, abortSignal: combinedSignal };
|
||||
const searchOptions = {
|
||||
strategy: ENHANCED_ES_SEARCH_STRATEGY,
|
||||
...options,
|
||||
};
|
||||
const { sessionId, strategy, abortSignal } = searchOptions;
|
||||
const search = () => this.runSearch({ id, ...request }, searchOptions);
|
||||
|
||||
const searchAbortController = new SearchAbortController(abortSignal, this.searchTimeout);
|
||||
this.pendingCount$.next(this.pendingCount$.getValue() + 1);
|
||||
|
||||
const untrackSearch =
|
||||
this.deps.session.isCurrentSession(options.sessionId) &&
|
||||
this.deps.session.trackSearch({ abort });
|
||||
const untrackSearch = this.deps.session.isCurrentSession(options.sessionId)
|
||||
? this.deps.session.trackSearch({ abort: () => searchAbortController.abort() })
|
||||
: undefined;
|
||||
|
||||
// track if this search's session will be send to background
|
||||
// if yes, then we don't need to cancel this search when it is aborted
|
||||
let isSavedToBackground = false;
|
||||
const savedToBackgroundSub =
|
||||
this.deps.session.isCurrentSession(options.sessionId) &&
|
||||
this.deps.session.isCurrentSession(sessionId) &&
|
||||
this.deps.session.state$
|
||||
.pipe(
|
||||
skip(1), // ignore any state, we are only interested in transition x -> BackgroundLoading
|
||||
filter(
|
||||
(state) =>
|
||||
this.deps.session.isCurrentSession(options.sessionId) &&
|
||||
this.deps.session.isCurrentSession(sessionId) &&
|
||||
state === SearchSessionState.BackgroundLoading
|
||||
),
|
||||
take(1)
|
||||
|
@ -84,15 +84,18 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
|
|||
if (id && !isSavedToBackground) this.deps.http.delete(`/internal/search/${strategy}/${id}`);
|
||||
});
|
||||
|
||||
return pollSearch(search, cancel, { ...options, abortSignal: combinedSignal }).pipe(
|
||||
return pollSearch(search, cancel, {
|
||||
...options,
|
||||
abortSignal: searchAbortController.getSignal(),
|
||||
}).pipe(
|
||||
tap((response) => (id = response.id)),
|
||||
catchError((e: Error) => {
|
||||
cancel();
|
||||
return throwError(this.handleSearchError(e, timeoutSignal, options));
|
||||
return throwError(this.handleSearchError(e, options, searchAbortController.isTimeout()));
|
||||
}),
|
||||
finalize(() => {
|
||||
this.pendingCount$.next(this.pendingCount$.getValue() - 1);
|
||||
cleanup();
|
||||
searchAbortController.cleanup();
|
||||
if (untrackSearch && this.deps.session.isCurrentSession(options.sessionId)) {
|
||||
// untrack if this search still belongs to current session
|
||||
untrackSearch();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue