Abort cancelled search requests to Elasticsearch (#56788)

* Update abort controller library

* Bootstrap

* Abort when the request is aborted

* Add utility and update value suggestions route

* Remove bad merge

* Revert switching abort controller libraries

* Revert package.json in lib

* Move to previous abort controller

* Fix test to use fake timers to run debounced handlers

* Fix loading bar not going away when cancelling

* Add test for loading count

* Fix test

* Fix failing test

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Lukas Olson 2020-02-25 13:51:02 -07:00 committed by GitHub
parent d9e3d744ba
commit 7b4c809fc7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 161 additions and 19 deletions

View file

@ -35,12 +35,9 @@ describe('Sync search strategy', () => {
core: mockCoreStart,
getSearchStrategy: jest.fn(),
});
syncSearch.search(
{
serverStrategy: SYNC_SEARCH_STRATEGY,
},
{}
);
const request = { serverStrategy: SYNC_SEARCH_STRATEGY };
syncSearch.search(request, {});
expect(mockCoreStart.http.fetch.mock.calls[0][0]).toEqual({
path: `/internal/search/${SYNC_SEARCH_STRATEGY}`,
body: JSON.stringify({
@ -50,4 +47,47 @@ describe('Sync search strategy', () => {
signal: undefined,
});
});
it('increments and decrements loading count on success', async () => {
const expectedLoadingCountValues = [0, 1, 0];
const receivedLoadingCountValues: number[] = [];
mockCoreStart.http.fetch.mockResolvedValueOnce('response');
const syncSearch = syncSearchStrategyProvider({
core: mockCoreStart,
getSearchStrategy: jest.fn(),
});
const request = { serverStrategy: SYNC_SEARCH_STRATEGY };
const loadingCount$ = mockCoreStart.http.addLoadingCountSource.mock.calls[0][0];
loadingCount$.subscribe(value => receivedLoadingCountValues.push(value));
await syncSearch.search(request, {}).toPromise();
expect(receivedLoadingCountValues).toEqual(expectedLoadingCountValues);
});
it('increments and decrements loading count on failure', async () => {
expect.assertions(1);
const expectedLoadingCountValues = [0, 1, 0];
const receivedLoadingCountValues: number[] = [];
mockCoreStart.http.fetch.mockRejectedValueOnce('error');
const syncSearch = syncSearchStrategyProvider({
core: mockCoreStart,
getSearchStrategy: jest.fn(),
});
const request = { serverStrategy: SYNC_SEARCH_STRATEGY };
const loadingCount$ = mockCoreStart.http.addLoadingCountSource.mock.calls[0][0];
loadingCount$.subscribe(value => receivedLoadingCountValues.push(value));
try {
await syncSearch.search(request, {}).toPromise();
} catch (e) {
expect(receivedLoadingCountValues).toEqual(expectedLoadingCountValues);
}
});
});

View file

@ -18,7 +18,8 @@
*/
import { BehaviorSubject, from } from 'rxjs';
import { IKibanaSearchRequest, IKibanaSearchResponse } from '../../common/search';
import { finalize } from 'rxjs/operators';
import { IKibanaSearchRequest } from '../../common/search';
import { ISearch, ISearchOptions } from './i_search';
import { TSearchStrategyProvider, ISearchStrategy, ISearchContext } from './types';
@ -40,16 +41,14 @@ export const syncSearchStrategyProvider: TSearchStrategyProvider<typeof SYNC_SEA
) => {
loadingCount$.next(loadingCount$.getValue() + 1);
const response: Promise<IKibanaSearchResponse> = context.core.http.fetch({
path: `/internal/search/${request.serverStrategy}`,
method: 'POST',
body: JSON.stringify(request),
signal: options.signal,
});
response.then(() => loadingCount$.next(loadingCount$.getValue() - 1));
return from(response);
return from(
context.core.http.fetch({
path: `/internal/search/${request.serverStrategy}`,
method: 'POST',
body: JSON.stringify(request),
signal: options.signal,
})
).pipe(finalize(() => loadingCount$.next(loadingCount$.getValue() - 1)));
};
return { search };

View file

@ -23,6 +23,7 @@ import { IRouter } from 'kibana/server';
import { IFieldType, Filter } from '../index';
import { findIndexPatternById, getFieldByName } from '../index_patterns';
import { getRequestAbortedSignal } from '../lib';
export function registerValueSuggestionsRoute(router: IRouter) {
router.post(
@ -50,6 +51,7 @@ export function registerValueSuggestionsRoute(router: IRouter) {
const { field: fieldName, query, boolFilter } = request.body;
const { index } = request.params;
const { dataClient } = context.core.elasticsearch;
const signal = getRequestAbortedSignal(request.events.aborted$);
const autocompleteSearchOptions = {
timeout: await uiSettings.get<number>('kibana.autocompleteTimeout'),
@ -62,7 +64,7 @@ export function registerValueSuggestionsRoute(router: IRouter) {
const body = await getBody(autocompleteSearchOptions, field || fieldName, query, boolFilter);
try {
const result = await dataClient.callAsCurrentUser('search', { index, body });
const result = await dataClient.callAsCurrentUser('search', { index, body }, { signal });
const buckets: any[] =
get(result, 'aggregations.suggestions.buckets') ||

View file

@ -0,0 +1,45 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Subject } from 'rxjs';
import { getRequestAbortedSignal } from './get_request_aborted_signal';
describe('abortableRequestHandler', () => {
jest.useFakeTimers();
it('should call abort if disconnected', () => {
const abortedSubject = new Subject<void>();
const aborted$ = abortedSubject.asObservable();
const onAborted = jest.fn();
const signal = getRequestAbortedSignal(aborted$);
signal.addEventListener('abort', onAborted);
// Shouldn't be aborted or call onAborted prior to disconnecting
expect(signal.aborted).toBe(false);
expect(onAborted).not.toBeCalled();
abortedSubject.next();
jest.runAllTimers();
// Should be aborted and call onAborted after disconnecting
expect(signal.aborted).toBe(true);
expect(onAborted).toBeCalled();
});
});

View file

@ -0,0 +1,33 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Observable } from 'rxjs';
// @ts-ignore not typed
import { AbortController } from 'abortcontroller-polyfill/dist/cjs-ponyfill';
/**
* A simple utility function that returns an `AbortSignal` corresponding to an `AbortController`
* which aborts when the given request is aborted.
* @param aborted$ The observable of abort events (usually `request.events.aborted$`)
*/
export function getRequestAbortedSignal(aborted$: Observable<void>): AbortSignal {
const controller = new AbortController();
aborted$.subscribe(() => controller.abort());
return controller.signal;
}

View file

@ -0,0 +1,20 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export { getRequestAbortedSignal } from './get_request_aborted_signal';

View file

@ -19,6 +19,7 @@
import { schema } from '@kbn/config-schema';
import { IRouter } from '../../../../core/server';
import { getRequestAbortedSignal } from '../lib';
export function registerSearchRoute(router: IRouter): void {
router.post(
@ -35,8 +36,10 @@ export function registerSearchRoute(router: IRouter): void {
async (context, request, res) => {
const searchRequest = request.body;
const strategy = request.params.strategy;
const signal = getRequestAbortedSignal(request.events.aborted$);
try {
const response = await context.search!.search(searchRequest, {}, strategy);
const response = await context.search!.search(searchRequest, { signal }, strategy);
return res.ok({ body: response });
} catch (err) {
return res.customError({