[Step 1] use Observables on server search API (#79874)

* use Observables on server search API

* fix PR comments

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Alexey Antonov 2020-10-13 16:47:23 +03:00 committed by GitHub
parent e0bb8605b4
commit 1d1c3c7ef3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 419 additions and 328 deletions

View file

@ -16,6 +16,6 @@ export interface ISearchStart<SearchStrategyRequest extends IKibanaSearchRequest
| --- | --- | --- |
| [aggs](./kibana-plugin-plugins-data-server.isearchstart.aggs.md) | <code>AggsStart</code> | |
| [getSearchStrategy](./kibana-plugin-plugins-data-server.isearchstart.getsearchstrategy.md) | <code>(name: string) =&gt; ISearchStrategy&lt;SearchStrategyRequest, SearchStrategyResponse&gt;</code> | Get other registered search strategies. For example, if a new strategy needs to use the already-registered ES search strategy, it can use this function to accomplish that. |
| [search](./kibana-plugin-plugins-data-server.isearchstart.search.md) | <code>(context: RequestHandlerContext, request: SearchStrategyRequest, options: ISearchOptions) =&gt; Promise&lt;SearchStrategyResponse&gt;</code> | |
| [search](./kibana-plugin-plugins-data-server.isearchstart.search.md) | <code>ISearchStrategy['search']</code> | |
| [searchSource](./kibana-plugin-plugins-data-server.isearchstart.searchsource.md) | <code>{</code><br/><code> asScoped: (request: KibanaRequest) =&gt; Promise&lt;ISearchStartSearchSource&gt;;</code><br/><code> }</code> | |

View file

@ -7,5 +7,5 @@
<b>Signature:</b>
```typescript
search: (context: RequestHandlerContext, request: SearchStrategyRequest, options: ISearchOptions) => Promise<SearchStrategyResponse>;
search: ISearchStrategy['search'];
```

View file

@ -17,5 +17,5 @@ export interface ISearchStrategy<SearchStrategyRequest extends IKibanaSearchRequ
| Property | Type | Description |
| --- | --- | --- |
| [cancel](./kibana-plugin-plugins-data-server.isearchstrategy.cancel.md) | <code>(context: RequestHandlerContext, id: string) =&gt; Promise&lt;void&gt;</code> | |
| [search](./kibana-plugin-plugins-data-server.isearchstrategy.search.md) | <code>(context: RequestHandlerContext, request: SearchStrategyRequest, options?: ISearchOptions) =&gt; Promise&lt;SearchStrategyResponse&gt;</code> | |
| [search](./kibana-plugin-plugins-data-server.isearchstrategy.search.md) | <code>(request: SearchStrategyRequest, options: ISearchOptions, context: RequestHandlerContext) =&gt; Observable&lt;SearchStrategyResponse&gt;</code> | |

View file

@ -7,5 +7,5 @@
<b>Signature:</b>
```typescript
search: (context: RequestHandlerContext, request: SearchStrategyRequest, options?: ISearchOptions) => Promise<SearchStrategyResponse>;
search: (request: SearchStrategyRequest, options: ISearchOptions, context: RequestHandlerContext) => Observable<SearchStrategyResponse>;
```

View file

@ -17,6 +17,7 @@
* under the License.
*/
import { map } from 'rxjs/operators';
import { ISearchStrategy, PluginStart } from '../../../src/plugins/data/server';
import { IMyStrategyResponse, IMyStrategyRequest } from '../common';
@ -25,13 +26,13 @@ export const mySearchStrategyProvider = (
): ISearchStrategy<IMyStrategyRequest, IMyStrategyResponse> => {
const es = data.search.getSearchStrategy('es');
return {
search: async (context, request, options): Promise<IMyStrategyResponse> => {
const esSearchRes = await es.search(context, request, options);
return {
...esSearchRes,
cool: request.get_cool ? 'YES' : 'NOPE',
};
},
search: (request, options, context) =>
es.search(request, options, context).pipe(
map((esSearchRes) => ({
...esSearchRes,
cool: request.get_cool ? 'YES' : 'NOPE',
}))
),
cancel: async (context, id) => {
if (es.cancel) {
es.cancel(context, id);

View file

@ -39,26 +39,28 @@ export function registerServerSearchRoute(router: IRouter, data: DataPluginStart
// Run a synchronous search server side, by enforcing a high keepalive and waiting for completion.
// If you wish to run the search with polling (in basic+), you'd have to poll on the search API.
// Please reach out to the @app-arch-team if you need this to be implemented.
const res = await data.search.search(
context,
{
params: {
index,
body: {
aggs: {
'1': {
avg: {
field,
const res = await data.search
.search(
{
params: {
index,
body: {
aggs: {
'1': {
avg: {
field,
},
},
},
},
waitForCompletionTimeout: '5m',
keepAlive: '5m',
},
waitForCompletionTimeout: '5m',
keepAlive: '5m',
},
} as IEsSearchRequest,
{}
);
} as IEsSearchRequest,
{},
context
)
.toPromise();
return response.ok({
body: {

View file

@ -127,7 +127,7 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
request: SearchStrategyRequest,
options: ISearchOptions
) => {
return search(request, options).toPromise() as Promise<SearchStrategyResponse>;
return search<SearchStrategyRequest, SearchStrategyResponse>(request, options).toPromise();
},
onResponse: handleResponse,
legacy: {

View file

@ -35,7 +35,8 @@ describe('ES search strategy', () => {
},
},
});
const mockContext = {
const mockContext = ({
core: {
uiSettings: {
client: {
@ -44,7 +45,8 @@ describe('ES search strategy', () => {
},
elasticsearch: { client: { asCurrentUser: { search: mockApiCaller } } },
},
};
} as unknown) as RequestHandlerContext;
const mockConfig$ = pluginInitializerContextConfigMock<any>({}).legacy.globalConfig$;
beforeEach(() => {
@ -57,44 +59,51 @@ describe('ES search strategy', () => {
expect(typeof esSearch.search).toBe('function');
});
it('calls the API caller with the params with defaults', async () => {
it('calls the API caller with the params with defaults', async (done) => {
const params = { index: 'logstash-*' };
const esSearch = await esSearchStrategyProvider(mockConfig$, mockLogger);
await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params });
expect(mockApiCaller).toBeCalled();
expect(mockApiCaller.mock.calls[0][0]).toEqual({
...params,
ignore_unavailable: true,
track_total_hits: true,
});
await esSearchStrategyProvider(mockConfig$, mockLogger)
.search({ params }, {}, mockContext)
.subscribe(() => {
expect(mockApiCaller).toBeCalled();
expect(mockApiCaller.mock.calls[0][0]).toEqual({
...params,
ignore_unavailable: true,
track_total_hits: true,
});
done();
});
});
it('calls the API caller with overridden defaults', async () => {
it('calls the API caller with overridden defaults', async (done) => {
const params = { index: 'logstash-*', ignore_unavailable: false, timeout: '1000ms' };
const esSearch = await esSearchStrategyProvider(mockConfig$, mockLogger);
await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params });
expect(mockApiCaller).toBeCalled();
expect(mockApiCaller.mock.calls[0][0]).toEqual({
...params,
track_total_hits: true,
});
await esSearchStrategyProvider(mockConfig$, mockLogger)
.search({ params }, {}, mockContext)
.subscribe(() => {
expect(mockApiCaller).toBeCalled();
expect(mockApiCaller.mock.calls[0][0]).toEqual({
...params,
track_total_hits: true,
});
done();
});
});
it('has all response parameters', async () => {
const params = { index: 'logstash-*' };
const esSearch = await esSearchStrategyProvider(mockConfig$, mockLogger);
const response = await esSearch.search((mockContext as unknown) as RequestHandlerContext, {
params,
});
expect(response.isRunning).toBe(false);
expect(response.isPartial).toBe(false);
expect(response).toHaveProperty('loaded');
expect(response).toHaveProperty('rawResponse');
});
it('has all response parameters', async (done) =>
await esSearchStrategyProvider(mockConfig$, mockLogger)
.search(
{
params: { index: 'logstash-*' },
},
{},
mockContext
)
.subscribe((data) => {
expect(data.isRunning).toBe(false);
expect(data.isPartial).toBe(false);
expect(data).toHaveProperty('loaded');
expect(data).toHaveProperty('rawResponse');
done();
}));
});

View file

@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
import { Observable, from } from 'rxjs';
import { first } from 'rxjs/operators';
import { SharedGlobalConfig, Logger } from 'kibana/server';
import { SearchResponse } from 'elasticsearch';
import { Observable } from 'rxjs';
import { ApiResponse } from '@elastic/elasticsearch';
import { SearchUsage } from '../collectors/usage';
import { toSnakeCase } from './to_snake_case';
@ -29,6 +29,7 @@ import {
getTotalLoaded,
getShardTimeout,
shimAbortSignal,
IEsSearchResponse,
} from '..';
export const esSearchStrategyProvider = (
@ -37,47 +38,52 @@ export const esSearchStrategyProvider = (
usage?: SearchUsage
): ISearchStrategy => {
return {
search: async (context, request, options) => {
logger.debug(`search ${request.params?.index}`);
const config = await config$.pipe(first()).toPromise();
const uiSettingsClient = await context.core.uiSettings.client;
search: (request, options, context) =>
from(
new Promise<IEsSearchResponse>(async (resolve, reject) => {
logger.debug(`search ${request.params?.index}`);
const config = await config$.pipe(first()).toPromise();
const uiSettingsClient = await context.core.uiSettings.client;
// Only default index pattern type is supported here.
// See data_enhanced for other type support.
if (!!request.indexType) {
throw new Error(`Unsupported index pattern type ${request.indexType}`);
}
// Only default index pattern type is supported here.
// See data_enhanced for other type support.
if (!!request.indexType) {
throw new Error(`Unsupported index pattern type ${request.indexType}`);
}
// ignoreThrottled is not supported in OSS
const { ignoreThrottled, ...defaultParams } = await getDefaultSearchParams(uiSettingsClient);
// ignoreThrottled is not supported in OSS
const { ignoreThrottled, ...defaultParams } = await getDefaultSearchParams(
uiSettingsClient
);
const params = toSnakeCase({
...defaultParams,
...getShardTimeout(config),
...request.params,
});
const params = toSnakeCase({
...defaultParams,
...getShardTimeout(config),
...request.params,
});
try {
const promise = shimAbortSignal(
context.core.elasticsearch.client.asCurrentUser.search(params),
options?.abortSignal
);
const { body: rawResponse } = (await promise) as ApiResponse<SearchResponse<any>>;
try {
const promise = shimAbortSignal(
context.core.elasticsearch.client.asCurrentUser.search(params),
options?.abortSignal
);
const { body: rawResponse } = (await promise) as ApiResponse<SearchResponse<any>>;
if (usage) usage.trackSuccess(rawResponse.took);
if (usage) usage.trackSuccess(rawResponse.took);
// The above query will either complete or timeout and throw an error.
// There is no progress indication on this api.
return {
isPartial: false,
isRunning: false,
rawResponse,
...getTotalLoaded(rawResponse._shards),
};
} catch (e) {
if (usage) usage.trackError();
throw e;
}
},
// The above query will either complete or timeout and throw an error.
// There is no progress indication on this api.
resolve({
isPartial: false,
isRunning: false,
rawResponse,
...getTotalLoaded(rawResponse._shards),
});
} catch (e) {
if (usage) usage.trackError();
reject(e);
}
})
),
};
};

View file

@ -17,7 +17,7 @@
* under the License.
*/
import { Observable } from 'rxjs';
import { Observable, from } from 'rxjs';
import {
CoreSetup,
@ -66,7 +66,8 @@ describe('Search service', () => {
},
},
};
mockDataStart.search.search.mockResolvedValue(response);
mockDataStart.search.search.mockReturnValue(from(Promise.resolve(response)));
const mockContext = {};
const mockBody = { id: undefined, params: {} };
const mockParams = { strategy: 'foo' };
@ -83,7 +84,7 @@ describe('Search service', () => {
await handler((mockContext as unknown) as RequestHandlerContext, mockRequest, mockResponse);
expect(mockDataStart.search.search).toBeCalled();
expect(mockDataStart.search.search.mock.calls[0][1]).toStrictEqual(mockBody);
expect(mockDataStart.search.search.mock.calls[0][0]).toStrictEqual(mockBody);
expect(mockResponse.ok).toBeCalled();
expect(mockResponse.ok.mock.calls[0][0]).toEqual({
body: response,
@ -91,12 +92,16 @@ describe('Search service', () => {
});
it('handler throws an error if the search throws an error', async () => {
mockDataStart.search.search.mockRejectedValue({
message: 'oh no',
body: {
error: 'oops',
},
});
const rejectedValue = from(
Promise.reject({
message: 'oh no',
body: {
error: 'oops',
},
})
);
mockDataStart.search.search.mockReturnValue(rejectedValue);
const mockContext = {};
const mockBody = { id: undefined, params: {} };
@ -114,7 +119,7 @@ describe('Search service', () => {
await handler((mockContext as unknown) as RequestHandlerContext, mockRequest, mockResponse);
expect(mockDataStart.search.search).toBeCalled();
expect(mockDataStart.search.search.mock.calls[0][1]).toStrictEqual(mockBody);
expect(mockDataStart.search.search.mock.calls[0][0]).toStrictEqual(mockBody);
expect(mockResponse.customError).toBeCalled();
const error: any = mockResponse.customError.mock.calls[0][0];
expect(error.body.message).toBe('oh no');

View file

@ -49,14 +49,16 @@ export function registerSearchRoute(
const [, , selfStart] = await getStartServices();
try {
const response = await selfStart.search.search(
context,
{ ...searchRequest, id },
{
abortSignal,
strategy,
}
);
const response = await selfStart.search
.search(
{ ...searchRequest, id },
{
abortSignal,
strategy,
},
context
)
.toPromise();
return res.ok({
body: {

View file

@ -49,10 +49,10 @@ import {
IKibanaSearchResponse,
IEsSearchRequest,
IEsSearchResponse,
ISearchOptions,
SearchSourceDependencies,
SearchSourceService,
searchSourceRequiredUiSettings,
ISearchOptions,
} from '../../common/search';
import {
getShardDelayBucketAgg,
@ -151,13 +151,7 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
return {
aggs: this.aggsService.start({ fieldFormats, uiSettings }),
getSearchStrategy: this.getSearchStrategy,
search: (
context: RequestHandlerContext,
searchRequest: IKibanaSearchRequest,
options: Record<string, any>
) => {
return this.search(context, searchRequest, options);
},
search: this.search.bind(this),
searchSource: {
asScoped: async (request: KibanaRequest) => {
const esClient = elasticsearch.client.asScoped(request);
@ -175,7 +169,13 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
const searchSourceDependencies: SearchSourceDependencies = {
getConfig: <T = any>(key: string): T => uiSettingsCache[key],
search: (searchRequest, options) => {
search: <
SearchStrategyRequest extends IKibanaSearchRequest = IEsSearchRequest,
SearchStrategyResponse extends IKibanaSearchResponse = IEsSearchResponse
>(
searchStrategyRequest: SearchStrategyRequest,
options: ISearchOptions
) => {
/**
* Unless we want all SearchSource users to provide both a KibanaRequest
* (needed for index patterns) AND the RequestHandlerContext (needed for
@ -195,7 +195,12 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
},
},
} as RequestHandlerContext;
return this.search(fakeRequestHandlerContext, searchRequest, options);
return this.search<SearchStrategyRequest, SearchStrategyResponse>(
searchStrategyRequest,
options,
fakeRequestHandlerContext
).toPromise();
},
// onResponse isn't used on the server, so we just return the original value
onResponse: (req, res) => res,
@ -234,13 +239,15 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
SearchStrategyRequest extends IKibanaSearchRequest = IEsSearchRequest,
SearchStrategyResponse extends IKibanaSearchResponse = IEsSearchResponse
>(
context: RequestHandlerContext,
searchRequest: SearchStrategyRequest,
options: ISearchOptions
): Promise<SearchStrategyResponse> => {
return this.getSearchStrategy<SearchStrategyRequest, SearchStrategyResponse>(
options: ISearchOptions,
context: RequestHandlerContext
) => {
const strategy = this.getSearchStrategy<SearchStrategyRequest, SearchStrategyResponse>(
options.strategy || this.defaultSearchStrategyName
).search(context, searchRequest, options);
);
return strategy.search(searchRequest, options, context);
};
private getSearchStrategy = <

View file

@ -17,6 +17,7 @@
* under the License.
*/
import { Observable } from 'rxjs';
import { KibanaRequest, RequestHandlerContext } from 'src/core/server';
import {
ISearchOptions,
@ -57,6 +58,22 @@ export interface ISearchSetup {
__enhance: (enhancements: SearchEnhancements) => void;
}
/**
* Search strategy interface contains a search method that takes in a request and returns a promise
* that resolves to a response.
*/
export interface ISearchStrategy<
SearchStrategyRequest extends IKibanaSearchRequest = IEsSearchRequest,
SearchStrategyResponse extends IKibanaSearchResponse = IEsSearchResponse
> {
search: (
request: SearchStrategyRequest,
options: ISearchOptions,
context: RequestHandlerContext
) => Observable<SearchStrategyResponse>;
cancel?: (context: RequestHandlerContext, id: string) => Promise<void>;
}
export interface ISearchStart<
SearchStrategyRequest extends IKibanaSearchRequest = IEsSearchRequest,
SearchStrategyResponse extends IKibanaSearchResponse = IEsSearchResponse
@ -69,28 +86,8 @@ export interface ISearchStart<
getSearchStrategy: (
name: string
) => ISearchStrategy<SearchStrategyRequest, SearchStrategyResponse>;
search: (
context: RequestHandlerContext,
request: SearchStrategyRequest,
options: ISearchOptions
) => Promise<SearchStrategyResponse>;
search: ISearchStrategy['search'];
searchSource: {
asScoped: (request: KibanaRequest) => Promise<ISearchStartSearchSource>;
};
}
/**
* Search strategy interface contains a search method that takes in a request and returns a promise
* that resolves to a response.
*/
export interface ISearchStrategy<
SearchStrategyRequest extends IKibanaSearchRequest = IEsSearchRequest,
SearchStrategyResponse extends IKibanaSearchResponse = IEsSearchResponse
> {
search: (
context: RequestHandlerContext,
request: SearchStrategyRequest,
options?: ISearchOptions
) => Promise<SearchStrategyResponse>;
cancel?: (context: RequestHandlerContext, id: string) => Promise<void>;
}

View file

@ -713,7 +713,7 @@ export interface ISearchStart<SearchStrategyRequest extends IKibanaSearchRequest
aggs: AggsStart;
getSearchStrategy: (name: string) => ISearchStrategy<SearchStrategyRequest, SearchStrategyResponse>;
// (undocumented)
search: (context: RequestHandlerContext, request: SearchStrategyRequest, options: ISearchOptions) => Promise<SearchStrategyResponse>;
search: ISearchStrategy['search'];
// (undocumented)
searchSource: {
asScoped: (request: KibanaRequest) => Promise<ISearchStartSearchSource>;
@ -727,7 +727,7 @@ export interface ISearchStrategy<SearchStrategyRequest extends IKibanaSearchRequ
// (undocumented)
cancel?: (context: RequestHandlerContext, id: string) => Promise<void>;
// (undocumented)
search: (context: RequestHandlerContext, request: SearchStrategyRequest, options?: ISearchOptions) => Promise<SearchStrategyResponse>;
search: (request: SearchStrategyRequest, options: ISearchOptions, context: RequestHandlerContext) => Observable<SearchStrategyResponse>;
}
// @public (undocumented)
@ -1140,7 +1140,7 @@ export function usageProvider(core: CoreSetup_2): SearchUsage;
// src/plugins/data/server/index.ts:254:1 - (ae-forgotten-export) The symbol "toAbsoluteDates" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index_patterns/index_patterns_service.ts:50:14 - (ae-forgotten-export) The symbol "IndexPatternsService" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/plugin.ts:88:66 - (ae-forgotten-export) The symbol "DataEnhancements" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/search/types.ts:78:5 - (ae-forgotten-export) The symbol "ISearchStartSearchSource" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/search/types.ts:91:5 - (ae-forgotten-export) The symbol "ISearchStartSearchSource" needs to be exported by the entry point index.d.ts
// (No @packageDocumentation comment for this package)

View file

@ -57,10 +57,17 @@ export function validateEsRoute(router: IRouter, core: CoreSetup) {
let resp;
try {
resp = await deps.data.search.search(context, body, {
strategy: ES_SEARCH_STRATEGY,
});
resp = resp.rawResponse;
resp = (
await deps.data.search
.search(
body,
{
strategy: ES_SEARCH_STRATEGY,
},
context
)
.toPromise()
).rawResponse;
} catch (errResp) {
resp = errResp;
}

View file

@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
import { from } from 'rxjs';
import es from './index';
import tlConfigFn from '../fixtures/tl_config';
import * as aggResponse from './lib/agg_response_to_series_list';
import buildRequest from './lib/build_request';
@ -36,7 +36,10 @@ function stubRequestAndServer(response, indexPatternSavedObjects = []) {
getStartServices: sinon
.stub()
.returns(
Promise.resolve([{}, { data: { search: { search: () => Promise.resolve(response) } } }])
Promise.resolve([
{},
{ data: { search: { search: () => from(Promise.resolve(response)) } } },
])
),
savedObjectsClient: {
find: function () {

View file

@ -132,9 +132,15 @@ export default new Datasource('es', {
const deps = (await tlConfig.getStartServices())[1];
const resp = await deps.data.search.search(tlConfig.context, body, {
strategy: ES_SEARCH_STRATEGY,
});
const resp = await deps.data.search
.search(
body,
{
strategy: ES_SEARCH_STRATEGY,
},
tlConfig.context
)
.toPromise();
if (!resp.rawResponse._shards.total) {
throw new Error(

View file

@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
import { from } from 'rxjs';
import { AbstractSearchStrategy } from './abstract_search_strategy';
describe('AbstractSearchStrategy', () => {
@ -55,7 +56,7 @@ describe('AbstractSearchStrategy', () => {
test('should return response', async () => {
const searches = [{ body: 'body', index: 'index' }];
const searchFn = jest.fn().mockReturnValue(Promise.resolve({}));
const searchFn = jest.fn().mockReturnValue(from(Promise.resolve({})));
const responses = await abstractSearchStrategy.search(
{
@ -82,7 +83,6 @@ describe('AbstractSearchStrategy', () => {
expect(responses).toEqual([{}]);
expect(searchFn).toHaveBeenCalledWith(
{},
{
params: {
body: 'body',
@ -92,7 +92,8 @@ describe('AbstractSearchStrategy', () => {
},
{
strategy: 'es',
}
},
{}
);
});
});

View file

@ -60,20 +60,22 @@ export class AbstractSearchStrategy {
const requests: any[] = [];
bodies.forEach((body) => {
requests.push(
deps.data.search.search(
req.requestContext,
{
params: {
...body,
...this.additionalParams,
deps.data.search
.search(
{
params: {
...body,
...this.additionalParams,
},
indexType: this.indexType,
},
indexType: this.indexType,
},
{
...options,
strategy: this.searchStrategyName,
}
)
{
...options,
strategy: this.searchStrategyName,
},
req.requestContext
)
.toPromise()
);
});
return Promise.all(requests);

View file

@ -82,7 +82,7 @@ describe('EQL search strategy', () => {
describe('async functionality', () => {
it('performs an eql client search with params when no ID is provided', async () => {
const eqlSearch = await eqlSearchStrategyProvider(mockLogger);
await eqlSearch.search(mockContext, { options, params });
await eqlSearch.search({ options, params }, {}, mockContext).toPromise();
const [[request, requestOptions]] = mockEqlSearch.mock.calls;
expect(request.index).toEqual('logstash-*');
@ -92,7 +92,7 @@ describe('EQL search strategy', () => {
it('retrieves the current request if an id is provided', async () => {
const eqlSearch = await eqlSearchStrategyProvider(mockLogger);
await eqlSearch.search(mockContext, { id: 'my-search-id' });
await eqlSearch.search({ id: 'my-search-id' }, {}, mockContext).toPromise();
const [[requestParams]] = mockEqlGet.mock.calls;
expect(mockEqlSearch).not.toHaveBeenCalled();
@ -103,7 +103,7 @@ describe('EQL search strategy', () => {
describe('arguments', () => {
it('sends along async search options', async () => {
const eqlSearch = await eqlSearchStrategyProvider(mockLogger);
await eqlSearch.search(mockContext, { options, params });
await eqlSearch.search({ options, params }, {}, mockContext).toPromise();
const [[request]] = mockEqlSearch.mock.calls;
expect(request).toEqual(
@ -116,7 +116,7 @@ describe('EQL search strategy', () => {
it('sends along default search parameters', async () => {
const eqlSearch = await eqlSearchStrategyProvider(mockLogger);
await eqlSearch.search(mockContext, { options, params });
await eqlSearch.search({ options, params }, {}, mockContext).toPromise();
const [[request]] = mockEqlSearch.mock.calls;
expect(request).toEqual(
@ -129,14 +129,20 @@ describe('EQL search strategy', () => {
it('allows search parameters to be overridden', async () => {
const eqlSearch = await eqlSearchStrategyProvider(mockLogger);
await eqlSearch.search(mockContext, {
options,
params: {
...params,
wait_for_completion_timeout: '5ms',
keep_on_completion: false,
},
});
await eqlSearch
.search(
{
options,
params: {
...params,
wait_for_completion_timeout: '5ms',
keep_on_completion: false,
},
},
{},
mockContext
)
.toPromise();
const [[request]] = mockEqlSearch.mock.calls;
expect(request).toEqual(
@ -150,10 +156,16 @@ describe('EQL search strategy', () => {
it('allows search options to be overridden', async () => {
const eqlSearch = await eqlSearchStrategyProvider(mockLogger);
await eqlSearch.search(mockContext, {
options: { ...options, maxRetries: 2, ignore: [300] },
params,
});
await eqlSearch
.search(
{
options: { ...options, maxRetries: 2, ignore: [300] },
params,
},
{},
mockContext
)
.toPromise();
const [[, requestOptions]] = mockEqlSearch.mock.calls;
expect(requestOptions).toEqual(
@ -166,7 +178,9 @@ describe('EQL search strategy', () => {
it('passes transport options for an existing request', async () => {
const eqlSearch = await eqlSearchStrategyProvider(mockLogger);
await eqlSearch.search(mockContext, { id: 'my-search-id', options: { ignore: [400] } });
await eqlSearch
.search({ id: 'my-search-id', options: { ignore: [400] } }, {}, mockContext)
.toPromise();
const [[, requestOptions]] = mockEqlGet.mock.calls;
expect(mockEqlSearch).not.toHaveBeenCalled();

View file

@ -4,6 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { from } from 'rxjs';
import { Logger } from 'kibana/server';
import { ApiResponse, TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport';
@ -26,48 +27,51 @@ export const eqlSearchStrategyProvider = (
id,
});
},
search: async (context, request, options) => {
logger.debug(`_eql/search ${JSON.stringify(request.params) || request.id}`);
let promise: TransportRequestPromise<ApiResponse>;
const eqlClient = context.core.elasticsearch.client.asCurrentUser.eql;
const uiSettingsClient = await context.core.uiSettings.client;
const asyncOptions = getAsyncOptions();
const searchOptions = toSnakeCase({ ...request.options });
search: (request, options, context) =>
from(
new Promise<EqlSearchStrategyResponse>(async (resolve) => {
logger.debug(`_eql/search ${JSON.stringify(request.params) || request.id}`);
let promise: TransportRequestPromise<ApiResponse>;
const eqlClient = context.core.elasticsearch.client.asCurrentUser.eql;
const uiSettingsClient = await context.core.uiSettings.client;
const asyncOptions = getAsyncOptions();
const searchOptions = toSnakeCase({ ...request.options });
if (request.id) {
promise = eqlClient.get(
{
id: request.id,
...toSnakeCase(asyncOptions),
},
searchOptions
);
} else {
const { ignoreThrottled, ignoreUnavailable } = await getDefaultSearchParams(
uiSettingsClient
);
const searchParams = toSnakeCase({
ignoreThrottled,
ignoreUnavailable,
...asyncOptions,
...request.params,
});
if (request.id) {
promise = eqlClient.get(
{
id: request.id,
...toSnakeCase(asyncOptions),
},
searchOptions
);
} else {
const { ignoreThrottled, ignoreUnavailable } = await getDefaultSearchParams(
uiSettingsClient
);
const searchParams = toSnakeCase({
ignoreThrottled,
ignoreUnavailable,
...asyncOptions,
...request.params,
});
promise = eqlClient.search(
searchParams as EqlSearchStrategyRequest['params'],
searchOptions
);
}
promise = eqlClient.search(
searchParams as EqlSearchStrategyRequest['params'],
searchOptions
);
}
const rawResponse = await shimAbortSignal(promise, options?.abortSignal);
const { id, is_partial: isPartial, is_running: isRunning } = rawResponse.body;
const rawResponse = await shimAbortSignal(promise, options?.abortSignal);
const { id, is_partial: isPartial, is_running: isRunning } = rawResponse.body;
return {
id,
isPartial,
isRunning,
rawResponse,
};
},
resolve({
id,
isPartial,
isRunning,
rawResponse,
});
})
),
};
};

View file

@ -86,7 +86,9 @@ describe('ES search strategy', () => {
const params = { index: 'logstash-*', body: { query: {} } };
const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger);
await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params });
await esSearch
.search({ params }, {}, (mockContext as unknown) as RequestHandlerContext)
.toPromise();
expect(mockSubmitCaller).toBeCalled();
const request = mockSubmitCaller.mock.calls[0][0];
@ -100,7 +102,9 @@ describe('ES search strategy', () => {
const params = { index: 'logstash-*', body: { query: {} } };
const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger);
await esSearch.search((mockContext as unknown) as RequestHandlerContext, { id: 'foo', params });
await esSearch
.search({ id: 'foo', params }, {}, (mockContext as unknown) as RequestHandlerContext)
.toPromise();
expect(mockGetCaller).toBeCalled();
const request = mockGetCaller.mock.calls[0][0];
@ -115,10 +119,16 @@ describe('ES search strategy', () => {
const params = { index: 'foo-程', body: {} };
const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger);
await esSearch.search((mockContext as unknown) as RequestHandlerContext, {
indexType: 'rollup',
params,
});
await esSearch
.search(
{
indexType: 'rollup',
params,
},
{},
(mockContext as unknown) as RequestHandlerContext
)
.toPromise();
expect(mockApiCaller).toBeCalled();
const { method, path } = mockApiCaller.mock.calls[0][0];
@ -132,7 +142,9 @@ describe('ES search strategy', () => {
const params = { index: 'foo-*', body: {} };
const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger);
await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params });
await esSearch
.search({ params }, {}, (mockContext as unknown) as RequestHandlerContext)
.toPromise();
expect(mockSubmitCaller).toBeCalled();
const request = mockSubmitCaller.mock.calls[0][0];

View file

@ -4,6 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { from } from 'rxjs';
import { first } from 'rxjs/operators';
import { SearchResponse } from 'elasticsearch';
import { Observable } from 'rxjs';
@ -36,35 +37,38 @@ export const enhancedEsSearchStrategyProvider = (
logger: Logger,
usage?: SearchUsage
): ISearchStrategy => {
const search = async (
context: RequestHandlerContext,
const search = (
request: IEnhancedEsSearchRequest,
options?: ISearchOptions
) => {
logger.debug(`search ${JSON.stringify(request.params) || request.id}`);
options: ISearchOptions,
context: RequestHandlerContext
) =>
from(
new Promise<IEsSearchResponse>(async (resolve, reject) => {
logger.debug(`search ${JSON.stringify(request.params) || request.id}`);
const isAsync = request.indexType !== 'rollup';
const isAsync = request.indexType !== 'rollup';
try {
const response = isAsync
? await asyncSearch(context, request, options)
: await rollupSearch(context, request, options);
try {
const response = isAsync
? await asyncSearch(request, options, context)
: await rollupSearch(request, options, context);
if (
usage &&
isAsync &&
isEnhancedEsSearchResponse(response) &&
isCompleteResponse(response)
) {
usage.trackSuccess(response.rawResponse.took);
}
if (
usage &&
isAsync &&
isEnhancedEsSearchResponse(response) &&
isCompleteResponse(response)
) {
usage.trackSuccess(response.rawResponse.took);
}
return response;
} catch (e) {
if (usage) usage.trackError();
throw e;
}
};
resolve(response);
} catch (e) {
if (usage) usage.trackError();
reject(e);
}
})
);
const cancel = async (context: RequestHandlerContext, id: string) => {
logger.debug(`cancel ${id}`);
@ -74,9 +78,9 @@ export const enhancedEsSearchStrategyProvider = (
};
async function asyncSearch(
context: RequestHandlerContext,
request: IEnhancedEsSearchRequest,
options?: ISearchOptions
options: ISearchOptions,
context: RequestHandlerContext
): Promise<IEsSearchResponse> {
let promise: TransportRequestPromise<any>;
const esClient = context.core.elasticsearch.client.asCurrentUser;
@ -112,9 +116,9 @@ export const enhancedEsSearchStrategyProvider = (
}
const rollupSearch = async function (
context: RequestHandlerContext,
request: IEnhancedEsSearchRequest,
options?: ISearchOptions
options: ISearchOptions,
context: RequestHandlerContext
): Promise<IEsSearchResponse> {
const esClient = context.core.elasticsearch.client.asCurrentUser;
const uiSettingsClient = await context.core.uiSettings.client;

View file

@ -4,6 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { from } from 'rxjs';
import isEmpty from 'lodash/isEmpty';
import { IndexPatternsFetcher, ISearchStrategy } from '../../../../../../src/plugins/data/server';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
@ -25,60 +26,63 @@ export const securitySolutionIndexFieldsProvider = (): ISearchStrategy<
const beatFields: BeatFields = require('../../utils/beat_schema/fields').fieldsBeat;
return {
search: async (context, request) => {
const { elasticsearch } = context.core;
const indexPatternsFetcher = new IndexPatternsFetcher(
elasticsearch.legacy.client.callAsCurrentUser
);
const dedupeIndices = dedupeIndexName(request.indices);
search: (request, options, context) =>
from(
new Promise<IndexFieldsStrategyResponse>(async (resolve) => {
const { elasticsearch } = context.core;
const indexPatternsFetcher = new IndexPatternsFetcher(
elasticsearch.legacy.client.callAsCurrentUser
);
const dedupeIndices = dedupeIndexName(request.indices);
const responsesIndexFields = await Promise.all(
dedupeIndices
.map((index) =>
indexPatternsFetcher.getFieldsForWildcard({
pattern: index,
})
)
.map((p) => p.catch((e) => false))
);
let indexFields: IndexField[] = [];
const responsesIndexFields = await Promise.all(
dedupeIndices
.map((index) =>
indexPatternsFetcher.getFieldsForWildcard({
pattern: index,
})
)
.map((p) => p.catch((e) => false))
);
let indexFields: IndexField[] = [];
if (!request.onlyCheckIfIndicesExist) {
indexFields = await formatIndexFields(
beatFields,
responsesIndexFields.filter((rif) => rif !== false) as FieldDescriptor[][],
dedupeIndices
);
}
if (!request.onlyCheckIfIndicesExist) {
indexFields = await formatIndexFields(
beatFields,
responsesIndexFields.filter((rif) => rif !== false) as FieldDescriptor[][],
dedupeIndices
);
}
return Promise.resolve({
indexFields,
indicesExist: dedupeIndices.filter((index, i) => responsesIndexFields[i] !== false),
rawResponse: {
timed_out: false,
took: -1,
_shards: {
total: -1,
successful: -1,
failed: -1,
skipped: -1,
},
hits: {
total: -1,
max_score: -1,
hits: [
{
_index: '',
_type: '',
_id: '',
_score: -1,
_source: null,
return resolve({
indexFields,
indicesExist: dedupeIndices.filter((index, i) => responsesIndexFields[i] !== false),
rawResponse: {
timed_out: false,
took: -1,
_shards: {
total: -1,
successful: -1,
failed: -1,
skipped: -1,
},
],
},
},
});
},
hits: {
total: -1,
max_score: -1,
hits: [
{
_index: '',
_type: '',
_id: '',
_score: -1,
_source: null,
},
],
},
},
});
})
),
};
};

View file

@ -4,6 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { mergeMap } from 'rxjs/operators';
import { ISearchStrategy, PluginStart } from '../../../../../../src/plugins/data/server';
import {
FactoryQueryTypes,
@ -19,15 +20,16 @@ export const securitySolutionSearchStrategyProvider = <T extends FactoryQueryTyp
const es = data.search.getSearchStrategy('es');
return {
search: async (context, request, options) => {
search: (request, options, context) => {
if (request.factoryQueryType == null) {
throw new Error('factoryQueryType is required');
}
const queryFactory: SecuritySolutionFactory<T> =
securitySolutionFactory[request.factoryQueryType];
const dsl = queryFactory.buildDsl(request);
const esSearchRes = await es.search(context, { ...request, params: dsl }, options);
return queryFactory.parse(request, esSearchRes);
return es
.search({ ...request, params: dsl }, options, context)
.pipe(mergeMap((esSearchRes) => queryFactory.parse(request, esSearchRes)));
},
cancel: async (context, id) => {
if (es.cancel) {

View file

@ -4,6 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { mergeMap } from 'rxjs/operators';
import { ISearchStrategy, PluginStart } from '../../../../../../src/plugins/data/server';
import {
TimelineFactoryQueryTypes,
@ -19,15 +20,17 @@ export const securitySolutionTimelineSearchStrategyProvider = <T extends Timelin
const es = data.search.getSearchStrategy('es');
return {
search: async (context, request, options) => {
search: (request, options, context) => {
if (request.factoryQueryType == null) {
throw new Error('factoryQueryType is required');
}
const queryFactory: SecuritySolutionTimelineFactory<T> =
securitySolutionTimelineFactory[request.factoryQueryType];
const dsl = queryFactory.buildDsl(request);
const esSearchRes = await es.search(context, { ...request, params: dsl }, options);
return queryFactory.parse(request, esSearchRes);
return es
.search({ ...request, params: dsl }, options, context)
.pipe(mergeMap((esSearchRes) => queryFactory.parse(request, esSearchRes)));
},
cancel: async (context, id) => {
if (es.cancel) {