[Search service] Add async search strategy (#53057)

* Add async search strategy

* Add async search

* Fix async strategy and add tests

* Move types to separate file

* Revert changes to demo search

* Update demo search strategy to use async

* Add cancellation to search strategies

* Add tests

* Simplify async search strategy

* Move loadingCount to search strategy

* Update abort controller library

* Bootstrap

* Abort when the request is aborted

* Add utility and update value suggestions route

* Fix bad merge conflict

* Update tests

* Move to data_enhanced plugin

* Remove bad merge

* Revert switching abort controller libraries

* Revert package.json in lib

* Move to previous abort controller

* Fix test to use fake timers to run debounced handlers

* Revert changes to example plugin

* Fix loading bar not going away when cancelling

* Call getSearchStrategy instead of passing  directly

* Add async demo search strategy

* Fix error with setting state

* Update how aborting works

* Fix type checks

* Add test for loading count

* Attempt to fix broken example test

* Revert changes to test

* Fix test

* Update name to camelCase

* Fix failing test

* Don't require data_enhanced in example plugin

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Lukas Olson 2020-02-25 15:57:40 -07:00
parent 44225a9bfe
commit b0336a7634
27 changed files with 601 additions and 28 deletions

View file

@ -20,6 +20,7 @@
import { IKibanaSearchRequest, IKibanaSearchResponse } from '../../../src/plugins/data/public';
export const DEMO_SEARCH_STRATEGY = 'DEMO_SEARCH_STRATEGY';
export const ASYNC_DEMO_SEARCH_STRATEGY = 'ASYNC_DEMO_SEARCH_STRATEGY';
export interface IDemoRequest extends IKibanaSearchRequest {
mood: string | 'sad' | 'happy';
@ -29,3 +30,11 @@ export interface IDemoRequest extends IKibanaSearchRequest {
export interface IDemoResponse extends IKibanaSearchResponse {
greeting: string;
}
export interface IAsyncDemoRequest extends IKibanaSearchRequest {
fibonacciNumbers: number;
}
export interface IAsyncDemoResponse extends IKibanaSearchResponse {
fibonacciSequence: number[];
}

View file

@ -2,7 +2,7 @@
"id": "demoSearch",
"version": "0.0.1",
"kibanaVersion": "kibana",
"configPath": ["demo_search"],
"configPath": ["demoSearch"],
"server": true,
"ui": true,
"requiredPlugins": ["data"],

View file

@ -0,0 +1,69 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Observable } from 'rxjs';
import {
ISearchContext,
TSearchStrategyProvider,
ISearchStrategy,
} from '../../../src/plugins/data/public';
import { ASYNC_DEMO_SEARCH_STRATEGY, IAsyncDemoResponse } from '../common';
import { ASYNC_SEARCH_STRATEGY } from '../../../x-pack/plugins/data_enhanced/public';
/**
* This demo search strategy provider simply provides a shortcut for calling the DEMO_ASYNC_SEARCH_STRATEGY
* on the server side, without users having to pass it in explicitly, and it takes advantage of the
* already registered ASYNC_SEARCH_STRATEGY that exists on the client.
*
* so instead of callers having to do:
*
* ```
* search(
* { ...request, serverStrategy: DEMO_ASYNC_SEARCH_STRATEGY },
* options,
* ASYNC_SEARCH_STRATEGY
* ) as Observable<IDemoResponse>,
*```
* They can instead just do
*
* ```
* search(request, options, DEMO_ASYNC_SEARCH_STRATEGY);
* ```
*
* and are ensured type safety in regard to the request and response objects.
*
* @param context - context supplied by other plugins.
* @param search - a search function to access other strategies that have already been registered.
*/
export const asyncDemoClientSearchStrategyProvider: TSearchStrategyProvider<typeof ASYNC_DEMO_SEARCH_STRATEGY> = (
context: ISearchContext
): ISearchStrategy<typeof ASYNC_DEMO_SEARCH_STRATEGY> => {
const asyncStrategyProvider = context.getSearchStrategy(ASYNC_SEARCH_STRATEGY);
const { search } = asyncStrategyProvider(context);
return {
search: (request, options) => {
return search(
{ ...request, serverStrategy: ASYNC_DEMO_SEARCH_STRATEGY },
options
) as Observable<IAsyncDemoResponse>;
},
};
};

View file

@ -43,7 +43,7 @@ import { DEMO_SEARCH_STRATEGY, IDemoResponse } from '../common';
* ```
* context.search(request, options, DEMO_SEARCH_STRATEGY);
* ```
*
*
* and are ensured type safety in regard to the request and response objects.
*
* @param context - context supplied by other plugins.

View file

@ -19,9 +19,16 @@
import { DataPublicPluginSetup } from '../../../src/plugins/data/public';
import { Plugin, CoreSetup } from '../../../src/core/public';
import { DEMO_SEARCH_STRATEGY } from '../common';
import {
DEMO_SEARCH_STRATEGY,
IDemoRequest,
IDemoResponse,
ASYNC_DEMO_SEARCH_STRATEGY,
IAsyncDemoRequest,
IAsyncDemoResponse,
} from '../common';
import { demoClientSearchStrategyProvider } from './demo_search_strategy';
import { IDemoRequest, IDemoResponse } from '../common';
import { asyncDemoClientSearchStrategyProvider } from './async_demo_search_strategy';
interface DemoDataSearchSetupDependencies {
data: DataPublicPluginSetup;
@ -39,10 +46,12 @@ interface DemoDataSearchSetupDependencies {
declare module '../../../src/plugins/data/public' {
export interface IRequestTypesMap {
[DEMO_SEARCH_STRATEGY]: IDemoRequest;
[ASYNC_DEMO_SEARCH_STRATEGY]: IAsyncDemoRequest;
}
export interface IResponseTypesMap {
[DEMO_SEARCH_STRATEGY]: IDemoResponse;
[ASYNC_DEMO_SEARCH_STRATEGY]: IAsyncDemoResponse;
}
}
@ -52,6 +61,10 @@ export class DemoDataPlugin implements Plugin {
DEMO_SEARCH_STRATEGY,
demoClientSearchStrategyProvider
);
deps.data.search.registerSearchStrategyProvider(
ASYNC_DEMO_SEARCH_STRATEGY,
asyncDemoClientSearchStrategyProvider
);
}
public start() {}

View file

@ -0,0 +1,60 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { TSearchStrategyProvider } from '../../../src/plugins/data/server';
import { ASYNC_DEMO_SEARCH_STRATEGY } from '../common';
function getFibonacciSequence(n = 0) {
const beginning = [0, 1].slice(0, n);
return Array(Math.max(0, n))
.fill(null)
.reduce((sequence, value, i) => {
if (i < 2) return sequence;
return [...sequence, sequence[i - 1] + sequence[i - 2]];
}, beginning);
}
const generateId = (() => {
let id = 0;
return () => `${id++}`;
})();
const loadedMap = new Map<string, number>();
const totalMap = new Map<string, number>();
export const asyncDemoSearchStrategyProvider: TSearchStrategyProvider<typeof ASYNC_DEMO_SEARCH_STRATEGY> = () => {
return {
search: async request => {
const id = request.id ?? generateId();
const loaded = (loadedMap.get(id) ?? 0) + 1;
loadedMap.set(id, loaded);
const total = request.fibonacciNumbers ?? totalMap.get(id);
totalMap.set(id, total);
const fibonacciSequence = getFibonacciSequence(loaded);
return { id, total, loaded, fibonacciSequence };
},
cancel: async id => {
loadedMap.delete(id);
totalMap.delete(id);
},
};
};

View file

@ -20,7 +20,15 @@
import { Plugin, CoreSetup, PluginInitializerContext } from 'kibana/server';
import { PluginSetup as DataPluginSetup } from 'src/plugins/data/server';
import { demoSearchStrategyProvider } from './demo_search_strategy';
import { DEMO_SEARCH_STRATEGY, IDemoRequest, IDemoResponse } from '../common';
import {
DEMO_SEARCH_STRATEGY,
IDemoRequest,
IDemoResponse,
ASYNC_DEMO_SEARCH_STRATEGY,
IAsyncDemoRequest,
IAsyncDemoResponse,
} from '../common';
import { asyncDemoSearchStrategyProvider } from './async_demo_search_strategy';
interface IDemoSearchExplorerDeps {
data: DataPluginSetup;
@ -38,10 +46,12 @@ interface IDemoSearchExplorerDeps {
declare module '../../../src/plugins/data/server' {
export interface IRequestTypesMap {
[DEMO_SEARCH_STRATEGY]: IDemoRequest;
[ASYNC_DEMO_SEARCH_STRATEGY]: IAsyncDemoRequest;
}
export interface IResponseTypesMap {
[DEMO_SEARCH_STRATEGY]: IDemoResponse;
[ASYNC_DEMO_SEARCH_STRATEGY]: IAsyncDemoResponse;
}
}
@ -54,6 +64,11 @@ export class DemoDataPlugin implements Plugin<void, void, IDemoSearchExplorerDep
DEMO_SEARCH_STRATEGY,
demoSearchStrategyProvider
);
deps.data.search.registerSearchStrategyProvider(
this.initializerContext.opaqueId,
ASYNC_DEMO_SEARCH_STRATEGY,
asyncDemoSearchStrategyProvider
);
}
public start() {}

View file

@ -1,5 +1,5 @@
{
"id": "search_explorer",
"id": "searchExplorer",
"version": "0.0.1",
"kibanaVersion": "kibana",
"configPath": ["search_explorer"],

View file

@ -1,5 +1,5 @@
{
"name": "search_explorer",
"name": "searchExplorer",
"version": "1.0.0",
"main": "target/examples/search_explorer",
"kibana": {

View file

@ -32,6 +32,7 @@ import { AppMountParameters, CoreStart } from '../../../src/core/public';
import { EsSearchTest } from './es_strategy';
import { Page } from './page';
import { DemoStrategy } from './demo_strategy';
import { AsyncDemoStrategy } from './async_demo_strategy';
import { DocumentationPage } from './documentation';
import { SearchApiPage } from './search_api';
import { AppPluginStartDependencies, SearchBarComponentParams } from './types';
@ -94,6 +95,11 @@ const SearchApp = ({ basename, data, application }: SearchBarComponentParams) =>
id: 'demoSearch',
component: <DemoStrategy search={data.search.search} />,
},
{
title: 'Async demo search strategy',
id: 'asyncDemoSearch',
component: <AsyncDemoStrategy search={data.search.search} />,
},
];
const routes = pages.map((page, i) => (

View file

@ -0,0 +1,123 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import React from 'react';
import {
EuiPageContentBody,
EuiFormRow,
EuiFlexItem,
EuiFlexGroup,
EuiFieldNumber,
} from '@elastic/eui';
import { ISearchGeneric } from '../../../src/plugins/data/public';
import { DoSearch } from './do_search';
import { GuideSection } from './guide_section';
import { ASYNC_DEMO_SEARCH_STRATEGY, IAsyncDemoRequest } from '../../demo_search/common';
// @ts-ignore
import demoStrategyServerProvider from '!!raw-loader!./../../demo_search/server/async_demo_search_strategy';
// @ts-ignore
import demoStrategyPublicProvider from '!!raw-loader!./../../demo_search/public/async_demo_search_strategy';
// @ts-ignore
import demoStrategyServerPlugin from '!!raw-loader!./../../demo_search/server/plugin';
// @ts-ignore
import demoStrategyPublicPlugin from '!!raw-loader!./../../demo_search/public/plugin';
interface Props {
search: ISearchGeneric;
}
interface State {
searching: boolean;
fibonacciNumbers: number;
changes: boolean;
error?: any;
}
export class AsyncDemoStrategy extends React.Component<Props, State> {
constructor(props: Props) {
super(props);
this.state = {
searching: false,
changes: false,
fibonacciNumbers: 5,
};
}
renderDemo = () => {
const request: IAsyncDemoRequest = {
fibonacciNumbers: this.state.fibonacciNumbers,
};
return (
<React.Fragment>
<EuiFlexGroup>
<EuiFlexItem>
<EuiFormRow label="How many Fibonacci numbers to generate?">
<EuiFieldNumber
value={this.state.fibonacciNumbers}
onChange={e => this.setState({ fibonacciNumbers: parseFloat(e.target.value) })}
/>
</EuiFormRow>
</EuiFlexItem>
</EuiFlexGroup>
<DoSearch
request={request}
strategy={ASYNC_DEMO_SEARCH_STRATEGY}
search={(signal: AbortSignal) =>
this.props.search(request, { signal }, ASYNC_DEMO_SEARCH_STRATEGY)
}
/>
</React.Fragment>
);
};
render() {
return (
<EuiPageContentBody>
<GuideSection
codeSections={[
{
title: 'Public',
code: [
{ description: 'plugin.ts', snippet: demoStrategyPublicPlugin },
{
description: 'async_demo_search_strategy.ts',
snippet: demoStrategyPublicProvider,
},
],
},
{
title: 'Server',
code: [
{ description: 'plugin.ts', snippet: demoStrategyServerPlugin },
{
description: 'async_demo_search_strategy.ts',
snippet: demoStrategyServerProvider,
},
],
},
]}
demo={this.renderDemo()}
/>
</EuiPageContentBody>
);
}
}

View file

@ -118,8 +118,8 @@ ${requestStr}
<EuiFlexItem>
<EuiText>Response:</EuiText>
<EuiProgress
value={(this.state.response && this.state.response.percentComplete) || 0}
max={100}
value={this.state.response?.loaded ?? 0}
max={this.state.response?.total ?? 0}
/>
<EuiCodeBlock
language="json"

View file

@ -23,12 +23,6 @@ export interface IKibanaSearchResponse {
*/
id?: string;
/**
* If relevant to the search strategy, return a percentage
* that represents how progress is indicated.
*/
percentComplete?: number;
/**
* If relevant to the search strategy, return a total number
* that represents how progress is indicated.

View file

@ -26,6 +26,8 @@ import { ISearchContext, TSearchStrategyProvider, ISearchStrategy } from '../typ
export const esSearchStrategyProvider: TSearchStrategyProvider<typeof ES_SEARCH_STRATEGY> = (
context: ISearchContext
): ISearchStrategy<typeof ES_SEARCH_STRATEGY> => {
const syncStrategyProvider = context.getSearchStrategy(SYNC_SEARCH_STRATEGY);
const { search } = syncStrategyProvider(context);
return {
search: (request, options) => {
if (typeof request.params.preference === 'undefined') {
@ -33,11 +35,9 @@ export const esSearchStrategyProvider: TSearchStrategyProvider<typeof ES_SEARCH_
const customPreference = context.core.uiSettings.get('courier:customRequestPreference');
request.params.preference = getEsPreference(setPreference, customPreference);
}
const syncStrategyProvider = context.getSearchStrategy(SYNC_SEARCH_STRATEGY);
return syncStrategyProvider(context).search(
{ ...request, serverStrategy: ES_SEARCH_STRATEGY },
options
) as Observable<IEsSearchResponse>;
return search({ ...request, serverStrategy: ES_SEARCH_STRATEGY }, options) as Observable<
IEsSearchResponse
>;
},
};
};

View file

@ -35,7 +35,7 @@ export {
export { IEsSearchResponse, IEsSearchRequest, ES_SEARCH_STRATEGY } from '../../common/search';
export { SYNC_SEARCH_STRATEGY } from './sync_search_strategy';
export { ISyncSearchRequest, SYNC_SEARCH_STRATEGY } from './sync_search_strategy';
export { IKibanaSearchResponse, IKibanaSearchRequest } from '../../common/search';

View file

@ -25,7 +25,7 @@ import { DEFAULT_SEARCH_STRATEGY } from '../../common/search';
// let mockCoreSetup: MockedKeys<CoreSetup>;
const mockDefaultSearch = jest.fn(() => Promise.resolve({ percentComplete: 0 }));
const mockDefaultSearch = jest.fn(() => Promise.resolve({ total: 100, loaded: 0 }));
const mockDefaultSearchStrategyProvider = jest.fn(() =>
Promise.resolve({
search: mockDefaultSearch,

View file

@ -31,7 +31,7 @@ export function createApi({
}) {
const api: IRouteHandlerSearchContext = {
search: async (request, options, strategyName) => {
const name = strategyName ? strategyName : DEFAULT_SEARCH_STRATEGY;
const name = strategyName ?? DEFAULT_SEARCH_STRATEGY;
const strategyProvider = searchStrategies[name];
if (!strategyProvider) {
throw new Error(`No strategy found for ${strategyName}`);
@ -40,6 +40,15 @@ export function createApi({
const strategy = await strategyProvider(caller, api.search);
return strategy.search(request, options);
},
cancel: async (id, strategyName) => {
const name = strategyName ?? DEFAULT_SEARCH_STRATEGY;
const strategyProvider = searchStrategies[name];
if (!strategyProvider) {
throw new Error(`No strategy found for ${strategyName}`);
}
const strategy = await strategyProvider(caller, api.search);
return strategy.cancel && strategy.cancel(id);
},
};
return api;
}

View file

@ -17,8 +17,9 @@
* under the License.
*/
import { ISearchGeneric } from './i_search';
import { ISearchGeneric, ICancelGeneric } from './i_search';
export interface IRouteHandlerSearchContext {
search: ISearchGeneric;
cancel: ICancelGeneric;
}

View file

@ -42,7 +42,14 @@ export type ISearchGeneric = <T extends TStrategyTypes = typeof ES_SEARCH_STRATE
strategy?: T
) => Promise<IResponseTypesMap[T]>;
export type ICancelGeneric = <T extends TStrategyTypes = typeof ES_SEARCH_STRATEGY>(
id: string,
strategy?: T
) => Promise<void>;
export type ISearch<T extends TStrategyTypes> = (
request: IRequestTypesMap[T],
options?: ISearchOptions
) => Promise<IResponseTypesMap[T]>;
export type ICancel<T extends TStrategyTypes> = (id: string) => Promise<void>;

View file

@ -18,7 +18,7 @@
*/
import { APICaller } from 'kibana/server';
import { ISearch, ISearchGeneric } from './i_search';
import { ISearch, ICancel, ISearchGeneric } from './i_search';
import { TStrategyTypes } from './strategy_types';
import { ISearchContext } from './i_search_context';
@ -28,6 +28,7 @@ import { ISearchContext } from './i_search_context';
*/
export interface ISearchStrategy<T extends TStrategyTypes> {
search: ISearch<T>;
cancel?: ICancel<T>;
}
/**

View file

@ -35,7 +35,7 @@ export function registerSearchRoute(router: IRouter): void {
},
async (context, request, res) => {
const searchRequest = request.body;
const strategy = request.params.strategy;
const { strategy } = request.params;
const signal = getRequestAbortedSignal(request.events.aborted$);
try {
@ -54,4 +54,35 @@ export function registerSearchRoute(router: IRouter): void {
}
}
);
router.delete(
{
path: '/internal/search/{strategy}/{id}',
validate: {
params: schema.object({
strategy: schema.string(),
id: schema.string(),
}),
query: schema.object({}, { allowUnknowns: true }),
},
},
async (context, request, res) => {
const { strategy, id } = request.params;
try {
await context.search!.cancel(id, strategy);
return res.ok();
} catch (err) {
return res.customError({
statusCode: err.statusCode,
body: {
message: err.message,
attributes: {
error: err.body.error,
},
},
});
}
}
);
}

View file

@ -9,3 +9,5 @@ import { DataEnhancedPlugin, DataEnhancedSetup, DataEnhancedStart } from './plug
export const plugin = () => new DataEnhancedPlugin();
export { DataEnhancedSetup, DataEnhancedStart };
export { ASYNC_SEARCH_STRATEGY, IAsyncSearchRequest, IAsyncSearchOptions } from './search';

View file

@ -8,6 +8,7 @@ import { CoreSetup, CoreStart, Plugin } from 'src/core/public';
import { DataPublicPluginSetup, DataPublicPluginStart } from '../../../../src/plugins/data/public';
import { setAutocompleteService } from './services';
import { setupKqlQuerySuggestionProvider, KUERY_LANGUAGE_NAME } from './autocomplete';
import { ASYNC_SEARCH_STRATEGY, asyncSearchStrategyProvider } from './search';
export interface DataEnhancedSetupDependencies {
data: DataPublicPluginSetup;
@ -20,11 +21,14 @@ export type DataEnhancedSetup = ReturnType<DataEnhancedPlugin['setup']>;
export type DataEnhancedStart = ReturnType<DataEnhancedPlugin['start']>;
export class DataEnhancedPlugin implements Plugin {
public setup(core: CoreSetup, plugins: DataEnhancedSetupDependencies) {
plugins.data.autocomplete.addQuerySuggestionProvider(
constructor() {}
public setup(core: CoreSetup, { data }: DataEnhancedSetupDependencies) {
data.autocomplete.addQuerySuggestionProvider(
KUERY_LANGUAGE_NAME,
setupKqlQuerySuggestionProvider(core)
);
data.search.registerSearchStrategyProvider(ASYNC_SEARCH_STRATEGY, asyncSearchStrategyProvider);
}
public start(core: CoreStart, plugins: DataEnhancedStartDependencies) {

View file

@ -0,0 +1,125 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { of } from 'rxjs';
import { AbortController } from 'abort-controller';
import { coreMock } from '../../../../../src/core/public/mocks';
import { asyncSearchStrategyProvider } from './async_search_strategy';
import { IAsyncSearchOptions } from './types';
import { CoreStart } from 'kibana/public';
describe('Async search strategy', () => {
let mockCoreStart: MockedKeys<CoreStart>;
const mockSearch = jest.fn();
const mockRequest = { params: {}, serverStrategy: 'foo' };
const mockOptions: IAsyncSearchOptions = { pollInterval: 0 };
beforeEach(() => {
mockCoreStart = coreMock.createStart();
mockSearch.mockReset();
});
it('only sends one request if the first response is complete', async () => {
mockSearch.mockReturnValueOnce(of({ id: 1, total: 1, loaded: 1 }));
const asyncSearch = asyncSearchStrategyProvider({
core: mockCoreStart,
getSearchStrategy: jest.fn().mockImplementation(() => {
return () => {
return {
search: mockSearch,
};
};
}),
});
await asyncSearch.search(mockRequest, mockOptions).toPromise();
expect(mockSearch.mock.calls[0][0]).toEqual(mockRequest);
expect(mockSearch.mock.calls[0][1]).toEqual({});
expect(mockSearch).toBeCalledTimes(1);
});
it('stops polling when the response is complete', async () => {
mockSearch
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 1 }))
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2 }))
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2 }));
const asyncSearch = asyncSearchStrategyProvider({
core: mockCoreStart,
getSearchStrategy: jest.fn().mockImplementation(() => {
return () => {
return {
search: mockSearch,
};
};
}),
});
expect(mockSearch).toBeCalledTimes(0);
await asyncSearch.search(mockRequest, mockOptions).toPromise();
expect(mockSearch).toBeCalledTimes(2);
});
it('only sends the ID and server strategy after the first request', async () => {
mockSearch
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 1 }))
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2 }));
const asyncSearch = asyncSearchStrategyProvider({
core: mockCoreStart,
getSearchStrategy: jest.fn().mockImplementation(() => {
return () => {
return {
search: mockSearch,
};
};
}),
});
expect(mockSearch).toBeCalledTimes(0);
await asyncSearch.search(mockRequest, mockOptions).toPromise();
expect(mockSearch).toBeCalledTimes(2);
expect(mockSearch.mock.calls[0][0]).toEqual(mockRequest);
expect(mockSearch.mock.calls[1][0]).toEqual({ id: 1, serverStrategy: 'foo' });
});
it('sends a DELETE request and stops polling when the signal is aborted', async () => {
mockSearch
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 1 }))
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2 }))
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2 }));
const asyncSearch = asyncSearchStrategyProvider({
core: mockCoreStart,
getSearchStrategy: jest.fn().mockImplementation(() => {
return () => {
return {
search: mockSearch,
};
};
}),
});
const abortController = new AbortController();
const options = { ...mockOptions, signal: abortController.signal };
const promise = asyncSearch.search(mockRequest, options).toPromise();
abortController.abort();
try {
await promise;
} catch (e) {
expect(e.name).toBe('AbortError');
expect(mockSearch).toBeCalledTimes(1);
expect(mockCoreStart.http.delete).toBeCalled();
}
});
});

View file

@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { EMPTY, fromEvent, NEVER, Observable, throwError, timer } from 'rxjs';
import { mergeMap, expand, takeUntil } from 'rxjs/operators';
import {
IKibanaSearchResponse,
ISearchContext,
ISearchStrategy,
SYNC_SEARCH_STRATEGY,
TSearchStrategyProvider,
} from '../../../../../src/plugins/data/public';
import { IAsyncSearchRequest, IAsyncSearchOptions } from './types';
export const ASYNC_SEARCH_STRATEGY = 'ASYNC_SEARCH_STRATEGY';
declare module '../../../../../src/plugins/data/public' {
export interface IRequestTypesMap {
[ASYNC_SEARCH_STRATEGY]: IAsyncSearchRequest;
}
}
export const asyncSearchStrategyProvider: TSearchStrategyProvider<typeof ASYNC_SEARCH_STRATEGY> = (
context: ISearchContext
): ISearchStrategy<typeof ASYNC_SEARCH_STRATEGY> => {
const syncStrategyProvider = context.getSearchStrategy(SYNC_SEARCH_STRATEGY);
const { search } = syncStrategyProvider(context);
return {
search: (
request: IAsyncSearchRequest,
{ pollInterval = 1000, ...options }: IAsyncSearchOptions = {}
): Observable<IKibanaSearchResponse> => {
const { serverStrategy } = request;
let id: string | undefined = request.id;
const aborted$ = options.signal
? fromEvent(options.signal, 'abort').pipe(
mergeMap(() => {
// If we haven't received the response to the initial request, including the ID, then
// we don't need to send a follow-up request to delete this search. Otherwise, we
// send the follow-up request to delete this search, then throw an abort error.
if (id !== undefined) {
context.core.http.delete(`/internal/search/${request.serverStrategy}/${id}`);
}
const error = new Error('Aborted');
error.name = 'AbortError';
return throwError(error);
})
)
: NEVER;
return search(request, options).pipe(
expand(response => {
// If the response indicates it is complete, stop polling and complete the observable
if ((response.loaded ?? 0) >= (response.total ?? 0)) return EMPTY;
id = response.id;
// Delay by the given poll interval
return timer(pollInterval).pipe(
// Send future requests using just the ID from the response
mergeMap(() => {
return search({ id, serverStrategy }, options);
})
);
}),
takeUntil(aborted$)
);
},
};
};

View file

@ -0,0 +1,8 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export { ASYNC_SEARCH_STRATEGY, asyncSearchStrategyProvider } from './async_search_strategy';
export { IAsyncSearchRequest, IAsyncSearchOptions } from './types';

View file

@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { ISearchOptions, ISyncSearchRequest } from '../../../../../src/plugins/data/public';
export interface IAsyncSearchRequest extends ISyncSearchRequest {
/**
* The ID received from the response from the initial request
*/
id?: string;
}
export interface IAsyncSearchOptions extends ISearchOptions {
/**
* The number of milliseconds to wait between receiving a response and sending another request
*/
pollInterval?: number;
}