[Search] Use new es client (#74529)

* improve test stability

* Use new client

* docs

* Use asyncSearch endpoints

* Clean up types

* Use transport request for now

* fixes

* Fix functional test

* encode

* remove eslint

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Liza Katz 2020-08-19 20:29:35 +03:00 committed by GitHub
parent 2c865f5649
commit b944dd3c96
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 96 additions and 98 deletions

View file

@ -47,7 +47,6 @@ import { NavigationPublicPluginStart } from '../../../../src/plugins/navigation/
import {
PLUGIN_ID,
PLUGIN_NAME,
IMyStrategyRequest,
IMyStrategyResponse,
SERVER_SEARCH_ROUTE_PATH,
} from '../../common';
@ -134,12 +133,9 @@ export const SearchExamplesApp = ({
query,
},
},
};
if (strategy) {
// Add a custom request parameter to be consumed by `MyStrategy`.
(request as IMyStrategyRequest).get_cool = getCool;
}
...(strategy ? { get_cool: getCool } : {}),
};
// Submit the search request using the `data.search` service.
const searchSubscription$ = data.search

View file

@ -16,14 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
import { SearchParams, SearchResponse } from 'elasticsearch';
import { SearchResponse } from 'elasticsearch';
import { Search } from '@elastic/elasticsearch/api/requestParams';
import { IKibanaSearchRequest, IKibanaSearchResponse } from '../types';
export const ES_SEARCH_STRATEGY = 'es';
export type ISearchRequestParams = {
trackTotalHits?: boolean;
} & SearchParams;
} & Search;
export interface IEsSearchRequest extends IKibanaSearchRequest {
params?: ISearchRequestParams;

View file

@ -61,7 +61,7 @@ import * as Rx from 'rxjs';
import { SavedObject } from 'src/core/server';
import { SavedObject as SavedObject_3 } from 'src/core/public';
import { SavedObjectsClientContract } from 'src/core/public';
import { SearchParams } from 'elasticsearch';
import { Search } from '@elastic/elasticsearch/api/requestParams';
import { SearchResponse as SearchResponse_2 } from 'elasticsearch';
import { SerializedFieldFormat as SerializedFieldFormat_2 } from 'src/plugins/expressions/common';
import { Subscription } from 'rxjs';

View file

@ -26,15 +26,17 @@ describe('ES search strategy', () => {
info: () => {},
};
const mockApiCaller = jest.fn().mockResolvedValue({
_shards: {
total: 10,
failed: 1,
skipped: 2,
successful: 7,
body: {
_shards: {
total: 10,
failed: 1,
skipped: 2,
successful: 7,
},
},
});
const mockContext = {
core: { elasticsearch: { legacy: { client: { callAsCurrentUser: mockApiCaller } } } },
core: { elasticsearch: { client: { asCurrentUser: { search: mockApiCaller } } } },
};
const mockConfig$ = pluginInitializerContextConfigMock<any>({}).legacy.globalConfig$;
@ -55,8 +57,7 @@ describe('ES search strategy', () => {
await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params });
expect(mockApiCaller).toBeCalled();
expect(mockApiCaller.mock.calls[0][0]).toBe('search');
expect(mockApiCaller.mock.calls[0][1]).toEqual({
expect(mockApiCaller.mock.calls[0][0]).toEqual({
...params,
timeout: '0ms',
ignoreUnavailable: true,
@ -71,8 +72,7 @@ describe('ES search strategy', () => {
await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params });
expect(mockApiCaller).toBeCalled();
expect(mockApiCaller.mock.calls[0][0]).toBe('search');
expect(mockApiCaller.mock.calls[0][1]).toEqual({
expect(mockApiCaller.mock.calls[0][0]).toEqual({
...params,
restTotalHitsAsInt: true,
});

View file

@ -20,6 +20,7 @@ import { first } from 'rxjs/operators';
import { SharedGlobalConfig, Logger } from 'kibana/server';
import { SearchResponse } from 'elasticsearch';
import { Observable } from 'rxjs';
import { ApiResponse } from '@elastic/elasticsearch';
import { SearchUsage } from '../collectors/usage';
import { ISearchStrategy, getDefaultSearchParams, getTotalLoaded } from '..';
@ -46,11 +47,10 @@ export const esSearchStrategyProvider = (
};
try {
const rawResponse = (await context.core.elasticsearch.legacy.client.callAsCurrentUser(
'search',
params,
options
)) as SearchResponse<any>;
const esResponse = (await context.core.elasticsearch.client.asCurrentUser.search(
params
)) as ApiResponse<SearchResponse<any>>;
const rawResponse = esResponse.body;
if (usage) usage.trackSuccess(rawResponse.took);

View file

@ -131,6 +131,7 @@ import { RequestStatistics } from 'src/plugins/inspector/common';
import { SavedObject } from 'src/core/server';
import { SavedObjectsClientContract as SavedObjectsClientContract_2 } from 'src/core/server';
import { ScrollParams } from 'elasticsearch';
import { Search } from '@elastic/elasticsearch/api/requestParams';
import { SearchParams } from 'elasticsearch';
import { SearchResponse } from 'elasticsearch';
import { SearchShardsParams } from 'elasticsearch';

View file

@ -9,8 +9,21 @@ import { pluginInitializerContextConfigMock } from '../../../../../src/core/serv
import { enhancedEsSearchStrategyProvider } from './es_search_strategy';
const mockAsyncResponse = {
id: 'foo',
response: {
body: {
id: 'foo',
response: {
_shards: {
total: 10,
failed: 1,
skipped: 2,
successful: 7,
},
},
},
};
const mockRollupResponse = {
body: {
_shards: {
total: 10,
failed: 1,
@ -20,22 +33,15 @@ const mockAsyncResponse = {
},
};
const mockRollupResponse = {
_shards: {
total: 10,
failed: 1,
skipped: 2,
successful: 7,
},
};
describe('ES search strategy', () => {
const mockApiCaller = jest.fn();
const mockLogger: any = {
info: () => {},
};
const mockContext = {
core: { elasticsearch: { legacy: { client: { callAsCurrentUser: mockApiCaller } } } },
core: {
elasticsearch: { client: { asCurrentUser: { transport: { request: mockApiCaller } } } },
},
};
const mockConfig$ = pluginInitializerContextConfigMock<any>({}).legacy.globalConfig$;
@ -58,8 +64,7 @@ describe('ES search strategy', () => {
await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params });
expect(mockApiCaller).toBeCalled();
expect(mockApiCaller.mock.calls[0][0]).toBe('transport.request');
const { method, path, body } = mockApiCaller.mock.calls[0][1];
const { method, path, body } = mockApiCaller.mock.calls[0][0];
expect(method).toBe('POST');
expect(path).toBe('/logstash-*/_async_search');
expect(body).toEqual({ query: {} });
@ -74,8 +79,7 @@ describe('ES search strategy', () => {
await esSearch.search((mockContext as unknown) as RequestHandlerContext, { id: 'foo', params });
expect(mockApiCaller).toBeCalled();
expect(mockApiCaller.mock.calls[0][0]).toBe('transport.request');
const { method, path, body } = mockApiCaller.mock.calls[0][1];
const { method, path, body } = mockApiCaller.mock.calls[0][0];
expect(method).toBe('GET');
expect(path).toBe('/_async_search/foo');
expect(body).toEqual(undefined);
@ -90,8 +94,7 @@ describe('ES search strategy', () => {
await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params });
expect(mockApiCaller).toBeCalled();
expect(mockApiCaller.mock.calls[0][0]).toBe('transport.request');
const { method, path } = mockApiCaller.mock.calls[0][1];
const { method, path } = mockApiCaller.mock.calls[0][0];
expect(method).toBe('POST');
expect(path).toBe('/foo-%E7%A8%8B/_async_search');
});
@ -108,8 +111,7 @@ describe('ES search strategy', () => {
});
expect(mockApiCaller).toBeCalled();
expect(mockApiCaller.mock.calls[0][0]).toBe('transport.request');
const { method, path } = mockApiCaller.mock.calls[0][1];
const { method, path } = mockApiCaller.mock.calls[0][0];
expect(method).toBe('POST');
expect(path).toBe('/foo-%E7%A8%8B/_rollup_search');
});
@ -123,9 +125,8 @@ describe('ES search strategy', () => {
await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params });
expect(mockApiCaller).toBeCalled();
expect(mockApiCaller.mock.calls[0][0]).toBe('transport.request');
const { query } = mockApiCaller.mock.calls[0][1];
expect(query).toHaveProperty('wait_for_completion_timeout');
expect(query).toHaveProperty('keep_alive');
const { querystring } = mockApiCaller.mock.calls[0][0];
expect(querystring).toHaveProperty('wait_for_completion_timeout');
expect(querystring).toHaveProperty('keep_alive');
});
});

View file

@ -6,41 +6,27 @@
import { first } from 'rxjs/operators';
import { mapKeys, snakeCase } from 'lodash';
import { SearchResponse } from 'elasticsearch';
import { Observable } from 'rxjs';
import { SearchResponse } from 'elasticsearch';
import {
LegacyAPICaller,
SharedGlobalConfig,
RequestHandlerContext,
ElasticsearchClient,
Logger,
} from '../../../../../src/core/server';
import {
ISearchOptions,
getDefaultSearchParams,
getTotalLoaded,
ISearchStrategy,
SearchUsage,
ISearchOptions,
} from '../../../../../src/plugins/data/server';
import { IEnhancedEsSearchRequest } from '../../common';
import { shimHitsTotal } from './shim_hits_total';
import { IEsSearchResponse } from '../../../../../src/plugins/data/common/search/es_search';
interface AsyncSearchResponse<T> {
id: string;
is_partial: boolean;
is_running: boolean;
response: SearchResponse<T>;
}
interface EnhancedEsSearchResponse extends IEsSearchResponse {
is_partial: boolean;
is_running: boolean;
}
function isEnhancedEsSearchResponse(
response: IEsSearchResponse
): response is EnhancedEsSearchResponse {
return response.hasOwnProperty('is_partial') && response.hasOwnProperty('is_running');
function isEnhancedEsSearchResponse(response: any): response is IEsSearchResponse {
return response.hasOwnProperty('isPartial') && response.hasOwnProperty('isRunning');
}
export const enhancedEsSearchStrategyProvider = (
@ -55,19 +41,23 @@ export const enhancedEsSearchStrategyProvider = (
) => {
logger.info(`search ${JSON.stringify(request.params) || request.id}`);
const config = await config$.pipe(first()).toPromise();
const caller = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const client = context.core.elasticsearch.client.asCurrentUser;
const defaultParams = getDefaultSearchParams(config);
const params = { ...defaultParams, ...request.params };
const isAsync = request.indexType !== 'rollup';
try {
const response =
request.indexType === 'rollup'
? await rollupSearch(caller, { ...request, params }, options)
: await asyncSearch(caller, { ...request, params }, options);
const response = isAsync
? await asyncSearch(client, { ...request, params }, options)
: await rollupSearch(client, { ...request, params }, options);
if (
usage &&
(!isEnhancedEsSearchResponse(response) || (!response.is_partial && !response.is_running))
isAsync &&
isEnhancedEsSearchResponse(response) &&
!response.isRunning &&
!response.isPartial
) {
usage.trackSuccess(response.rawResponse.took);
}
@ -81,11 +71,9 @@ export const enhancedEsSearchStrategyProvider = (
const cancel = async (context: RequestHandlerContext, id: string) => {
logger.info(`cancel ${id}`);
const method = 'DELETE';
const path = encodeURI(`/_async_search/${id}`);
await context.core.elasticsearch.legacy.client.callAsCurrentUser('transport.request', {
method,
path,
await context.core.elasticsearch.client.asCurrentUser.transport.request({
method: 'DELETE',
path: encodeURI(`/_async_search/${id}`),
});
};
@ -93,10 +81,10 @@ export const enhancedEsSearchStrategyProvider = (
};
async function asyncSearch(
caller: LegacyAPICaller,
client: ElasticsearchClient,
request: IEnhancedEsSearchRequest,
options?: ISearchOptions
) {
): Promise<IEsSearchResponse> {
const { timeout = undefined, restTotalHitsAsInt = undefined, ...params } = {
...request.params,
};
@ -112,46 +100,57 @@ async function asyncSearch(
// Only report partial results every 64 shards; this should be reduced when we actually display partial results
const batchedReduceSize = request.id ? undefined : 64;
const query = toSnakeCase({
const asyncOptions = {
waitForCompletionTimeout: '100ms', // Wait up to 100ms for the response to return
keepAlive: '1m', // Extend the TTL for this search request by one minute
};
const querystring = toSnakeCase({
...asyncOptions,
...(batchedReduceSize && { batchedReduceSize }),
...queryParams,
});
// eslint-disable-next-line @typescript-eslint/naming-convention
const { id, response, is_partial, is_running } = (await caller(
'transport.request',
{ method, path, body, query },
options
)) as AsyncSearchResponse<any>;
// TODO: replace with async endpoints once https://github.com/elastic/elasticsearch-js/issues/1280 is resolved
const esResponse = await client.transport.request({
method,
path,
body,
querystring,
});
const { id, response, is_partial: isPartial, is_running: isRunning } = esResponse.body;
return {
id,
isPartial: is_partial,
isRunning: is_running,
isPartial,
isRunning,
rawResponse: shimHitsTotal(response),
...getTotalLoaded(response._shards),
};
}
async function rollupSearch(
caller: LegacyAPICaller,
client: ElasticsearchClient,
request: IEnhancedEsSearchRequest,
options?: ISearchOptions
) {
): Promise<IEsSearchResponse> {
const { body, index, ...params } = request.params!;
const method = 'POST';
const path = encodeURI(`/${index}/_rollup_search`);
const query = toSnakeCase(params);
const querystring = toSnakeCase(params);
const rawResponse = await ((caller(
'transport.request',
{ method, path, body, query },
options
) as unknown) as SearchResponse<any>);
const esResponse = await client.transport.request({
method,
path,
body,
querystring,
});
return { rawResponse, ...getTotalLoaded(rawResponse._shards) };
const response = esResponse.body as SearchResponse<any>;
return {
rawResponse: shimHitsTotal(response),
...getTotalLoaded(response._shards),
};
}
function toSnakeCase(obj: Record<string, any>) {