mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 09:19:04 -04:00
[Step 2] Use Observables on server search API (#80709)
* [Step 2] Use Observables on server search API * apply some suggestions * use concatMap instead of expand * update docs * cleanup * fix PR comments * remove AsyncOptions * remove $config from eql_search_strategy * remove $config from es_search_strategy * remove DoSearchFnArgs, SearchMethod * some work * fix docs * remove waitForCompletion param * cleanup * some work * fix circular imports * Update src/plugins/data/server/search/es_search/es_search_rxjs_utils.ts Co-authored-by: Lukas Olson <olson.lukas@gmail.com> * Update src/plugins/data/common/search/es_search/es_search_rxjs_utils.ts Co-authored-by: Lukas Olson <olson.lukas@gmail.com> * Update src/plugins/data/common/search/es_search/es_search_rxjs_utils.ts Co-authored-by: Lukas Olson <olson.lukas@gmail.com> * Update x-pack/plugins/data_enhanced/common/search/es_search/es_search_rxjs_utils.ts Co-authored-by: Lukas Olson <olson.lukas@gmail.com> * fix PR comments * update docs * apply suggestions Co-authored-by: Lukas Olson <olson.lukas@gmail.com> Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
d4b2a5145a
commit
ffc4ba2514
52 changed files with 597 additions and 411 deletions
|
@ -7,5 +7,5 @@
|
|||
<b>Signature:</b>
|
||||
|
||||
```typescript
|
||||
isCompleteResponse: (response?: IKibanaSearchResponse<any> | undefined) => boolean | undefined
|
||||
isCompleteResponse: (response?: IKibanaSearchResponse<any> | undefined) => boolean
|
||||
```
|
||||
|
|
|
@ -7,5 +7,5 @@
|
|||
<b>Signature:</b>
|
||||
|
||||
```typescript
|
||||
isPartialResponse: (response?: IKibanaSearchResponse<any> | undefined) => boolean | undefined
|
||||
isPartialResponse: (response?: IKibanaSearchResponse<any> | undefined) => boolean
|
||||
```
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
```typescript
|
||||
export declare function getDefaultSearchParams(uiSettingsClient: IUiSettingsClient): Promise<{
|
||||
maxConcurrentShardRequests: number | undefined;
|
||||
ignoreThrottled: boolean;
|
||||
ignoreUnavailable: boolean;
|
||||
trackTotalHits: boolean;
|
||||
}>;
|
||||
|
@ -25,7 +24,6 @@ export declare function getDefaultSearchParams(uiSettingsClient: IUiSettingsClie
|
|||
|
||||
`Promise<{
|
||||
maxConcurrentShardRequests: number | undefined;
|
||||
ignoreThrottled: boolean;
|
||||
ignoreUnavailable: boolean;
|
||||
trackTotalHits: boolean;
|
||||
}>`
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
|
||||
|
||||
[Home](./index.md) > [kibana-plugin-plugins-data-server](./kibana-plugin-plugins-data-server.md) > [IEsRawSearchResponse](./kibana-plugin-plugins-data-server.iesrawsearchresponse.md) > [id](./kibana-plugin-plugins-data-server.iesrawsearchresponse.id.md)
|
||||
|
||||
## IEsRawSearchResponse.id property
|
||||
|
||||
<b>Signature:</b>
|
||||
|
||||
```typescript
|
||||
id?: string;
|
||||
```
|
|
@ -0,0 +1,11 @@
|
|||
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
|
||||
|
||||
[Home](./index.md) > [kibana-plugin-plugins-data-server](./kibana-plugin-plugins-data-server.md) > [IEsRawSearchResponse](./kibana-plugin-plugins-data-server.iesrawsearchresponse.md) > [is\_partial](./kibana-plugin-plugins-data-server.iesrawsearchresponse.is_partial.md)
|
||||
|
||||
## IEsRawSearchResponse.is\_partial property
|
||||
|
||||
<b>Signature:</b>
|
||||
|
||||
```typescript
|
||||
is_partial?: boolean;
|
||||
```
|
|
@ -0,0 +1,11 @@
|
|||
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
|
||||
|
||||
[Home](./index.md) > [kibana-plugin-plugins-data-server](./kibana-plugin-plugins-data-server.md) > [IEsRawSearchResponse](./kibana-plugin-plugins-data-server.iesrawsearchresponse.md) > [is\_running](./kibana-plugin-plugins-data-server.iesrawsearchresponse.is_running.md)
|
||||
|
||||
## IEsRawSearchResponse.is\_running property
|
||||
|
||||
<b>Signature:</b>
|
||||
|
||||
```typescript
|
||||
is_running?: boolean;
|
||||
```
|
|
@ -0,0 +1,20 @@
|
|||
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
|
||||
|
||||
[Home](./index.md) > [kibana-plugin-plugins-data-server](./kibana-plugin-plugins-data-server.md) > [IEsRawSearchResponse](./kibana-plugin-plugins-data-server.iesrawsearchresponse.md)
|
||||
|
||||
## IEsRawSearchResponse interface
|
||||
|
||||
<b>Signature:</b>
|
||||
|
||||
```typescript
|
||||
export interface IEsRawSearchResponse<Source = any> extends SearchResponse<Source>
|
||||
```
|
||||
|
||||
## Properties
|
||||
|
||||
| Property | Type | Description |
|
||||
| --- | --- | --- |
|
||||
| [id](./kibana-plugin-plugins-data-server.iesrawsearchresponse.id.md) | <code>string</code> | |
|
||||
| [is\_partial](./kibana-plugin-plugins-data-server.iesrawsearchresponse.is_partial.md) | <code>boolean</code> | |
|
||||
| [is\_running](./kibana-plugin-plugins-data-server.iesrawsearchresponse.is_running.md) | <code>boolean</code> | |
|
||||
|
|
@ -34,7 +34,6 @@
|
|||
| [parseInterval(interval)](./kibana-plugin-plugins-data-server.parseinterval.md) | |
|
||||
| [plugin(initializerContext)](./kibana-plugin-plugins-data-server.plugin.md) | Static code to be shared externally |
|
||||
| [shouldReadFieldFromDocValues(aggregatable, esType)](./kibana-plugin-plugins-data-server.shouldreadfieldfromdocvalues.md) | |
|
||||
| [toSnakeCase(obj)](./kibana-plugin-plugins-data-server.tosnakecase.md) | |
|
||||
| [usageProvider(core)](./kibana-plugin-plugins-data-server.usageprovider.md) | |
|
||||
|
||||
## Interfaces
|
||||
|
@ -45,6 +44,7 @@
|
|||
| [EsQueryConfig](./kibana-plugin-plugins-data-server.esqueryconfig.md) | |
|
||||
| [FieldDescriptor](./kibana-plugin-plugins-data-server.fielddescriptor.md) | |
|
||||
| [FieldFormatConfig](./kibana-plugin-plugins-data-server.fieldformatconfig.md) | |
|
||||
| [IEsRawSearchResponse](./kibana-plugin-plugins-data-server.iesrawsearchresponse.md) | |
|
||||
| [IEsSearchRequest](./kibana-plugin-plugins-data-server.iessearchrequest.md) | |
|
||||
| [IFieldSubType](./kibana-plugin-plugins-data-server.ifieldsubtype.md) | |
|
||||
| [IFieldType](./kibana-plugin-plugins-data-server.ifieldtype.md) | |
|
||||
|
|
|
@ -8,6 +8,24 @@
|
|||
|
||||
```typescript
|
||||
search: {
|
||||
esSearch: {
|
||||
utils: {
|
||||
doSearch: <SearchResponse = any>(searchMethod: () => Promise<SearchResponse>, abortSignal?: AbortSignal | undefined) => import("rxjs").Observable<SearchResponse>;
|
||||
shimAbortSignal: <T extends import("../common").TransportRequestPromise<unknown>>(promise: T, signal: AbortSignal | undefined) => T;
|
||||
trackSearchStatus: <KibanaResponse extends import("../common").IKibanaSearchResponse<any> = import("./search").IEsSearchResponse<import("../../../core/server").SearchResponse<unknown>>>(logger: import("@kbn/logging/target/logger").Logger, usage?: import("./search").SearchUsage | undefined) => import("rxjs").UnaryFunction<import("rxjs").Observable<KibanaResponse>, import("rxjs").Observable<KibanaResponse>>;
|
||||
includeTotalLoaded: () => import("rxjs").OperatorFunction<import("../common").IKibanaSearchResponse<import("elasticsearch").SearchResponse<unknown>>, {
|
||||
total: number;
|
||||
loaded: number;
|
||||
id?: string | undefined;
|
||||
isRunning?: boolean | undefined;
|
||||
isPartial?: boolean | undefined;
|
||||
rawResponse: import("elasticsearch").SearchResponse<unknown>;
|
||||
}>;
|
||||
toKibanaSearchResponse: <SearchResponse_1 extends import("../common").IEsRawSearchResponse<any> = import("../common").IEsRawSearchResponse<any>, KibanaResponse_1 extends import("../common").IKibanaSearchResponse<any> = import("../common").IKibanaSearchResponse<SearchResponse_1>>() => import("rxjs").OperatorFunction<import("@elastic/elasticsearch").ApiResponse<SearchResponse_1, import("@elastic/elasticsearch/lib/Transport").Context>, KibanaResponse_1>;
|
||||
getTotalLoaded: typeof getTotalLoaded;
|
||||
toSnakeCase: typeof toSnakeCase;
|
||||
};
|
||||
};
|
||||
aggs: {
|
||||
CidrMask: typeof CidrMask;
|
||||
dateHistogramInterval: typeof dateHistogramInterval;
|
||||
|
|
|
@ -1,22 +0,0 @@
|
|||
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
|
||||
|
||||
[Home](./index.md) > [kibana-plugin-plugins-data-server](./kibana-plugin-plugins-data-server.md) > [toSnakeCase](./kibana-plugin-plugins-data-server.tosnakecase.md)
|
||||
|
||||
## toSnakeCase() function
|
||||
|
||||
<b>Signature:</b>
|
||||
|
||||
```typescript
|
||||
export declare function toSnakeCase(obj: Record<string, any>): import("lodash").Dictionary<any>;
|
||||
```
|
||||
|
||||
## Parameters
|
||||
|
||||
| Parameter | Type | Description |
|
||||
| --- | --- | --- |
|
||||
| obj | <code>Record<string, any></code> | |
|
||||
|
||||
<b>Returns:</b>
|
||||
|
||||
`import("lodash").Dictionary<any>`
|
||||
|
|
@ -14,7 +14,7 @@ pageLoadAssetSize:
|
|||
dashboard: 374194
|
||||
dashboardEnhanced: 65646
|
||||
dashboardMode: 22716
|
||||
data: 1317839
|
||||
data: 1319839
|
||||
dataEnhanced: 50420
|
||||
devTools: 38637
|
||||
discover: 105145
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
import { from } from 'rxjs';
|
||||
import { map } from 'rxjs/operators';
|
||||
|
||||
import type { SearchResponse } from 'elasticsearch';
|
||||
import type { ApiResponse } from '@elastic/elasticsearch';
|
||||
|
||||
import { shimAbortSignal } from './shim_abort_signal';
|
||||
import { getTotalLoaded } from './get_total_loaded';
|
||||
|
||||
import type { IEsRawSearchResponse } from './types';
|
||||
import type { IKibanaSearchResponse } from '../types';
|
||||
|
||||
export const doSearch = <SearchResponse = any>(
|
||||
searchMethod: () => Promise<SearchResponse>,
|
||||
abortSignal?: AbortSignal
|
||||
) => from(shimAbortSignal(searchMethod(), abortSignal));
|
||||
|
||||
export const toKibanaSearchResponse = <
|
||||
SearchResponse extends IEsRawSearchResponse = IEsRawSearchResponse,
|
||||
KibanaResponse extends IKibanaSearchResponse = IKibanaSearchResponse<SearchResponse>
|
||||
>() =>
|
||||
map<ApiResponse<SearchResponse>, KibanaResponse>(
|
||||
(response) =>
|
||||
({
|
||||
id: response.body.id,
|
||||
isPartial: response.body.is_partial || false,
|
||||
isRunning: response.body.is_running || false,
|
||||
rawResponse: response.body,
|
||||
} as KibanaResponse)
|
||||
);
|
||||
|
||||
export const includeTotalLoaded = () =>
|
||||
map((response: IKibanaSearchResponse<SearchResponse<unknown>>) => ({
|
||||
...response,
|
||||
...getTotalLoaded(response.rawResponse._shards),
|
||||
}));
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
import { ShardsResponse } from 'elasticsearch';
|
||||
import type { ShardsResponse } from 'elasticsearch';
|
||||
|
||||
/**
|
||||
* Get the `total`/`loaded` for this response (see `IKibanaSearchResponse`). Note that `skipped` is
|
|
@ -19,3 +19,7 @@
|
|||
|
||||
export * from './types';
|
||||
export * from './utils';
|
||||
export * from './es_search_rxjs_utils';
|
||||
export * from './shim_abort_signal';
|
||||
export * from './to_snake_case';
|
||||
export * from './get_total_loaded';
|
||||
|
|
|
@ -17,12 +17,21 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
import { elasticsearchServiceMock } from '../../../../../core/server/mocks';
|
||||
import { shimAbortSignal } from '.';
|
||||
import { shimAbortSignal } from './shim_abort_signal';
|
||||
|
||||
const createSuccessTransportRequestPromise = (
|
||||
body: any,
|
||||
{ statusCode = 200 }: { statusCode?: number } = {}
|
||||
) => {
|
||||
const promise = Promise.resolve({ body, statusCode }) as any;
|
||||
promise.abort = jest.fn();
|
||||
|
||||
return promise;
|
||||
};
|
||||
|
||||
describe('shimAbortSignal', () => {
|
||||
it('aborts the promise if the signal is aborted', () => {
|
||||
const promise = elasticsearchServiceMock.createSuccessTransportRequestPromise({
|
||||
test('aborts the promise if the signal is aborted', () => {
|
||||
const promise = createSuccessTransportRequestPromise({
|
||||
success: true,
|
||||
});
|
||||
const controller = new AbortController();
|
||||
|
@ -32,8 +41,8 @@ describe('shimAbortSignal', () => {
|
|||
expect(promise.abort).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('returns the original promise', async () => {
|
||||
const promise = elasticsearchServiceMock.createSuccessTransportRequestPromise({
|
||||
test('returns the original promise', async () => {
|
||||
const promise = createSuccessTransportRequestPromise({
|
||||
success: true,
|
||||
});
|
||||
const controller = new AbortController();
|
||||
|
@ -42,8 +51,8 @@ describe('shimAbortSignal', () => {
|
|||
expect(response).toEqual(expect.objectContaining({ body: { success: true } }));
|
||||
});
|
||||
|
||||
it('allows the promise to be aborted manually', () => {
|
||||
const promise = elasticsearchServiceMock.createSuccessTransportRequestPromise({
|
||||
test('allows the promise to be aborted manually', () => {
|
||||
const promise = createSuccessTransportRequestPromise({
|
||||
success: true,
|
||||
});
|
||||
const controller = new AbortController();
|
|
@ -17,7 +17,13 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
import { TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport';
|
||||
/**
|
||||
* @internal
|
||||
* TransportRequestPromise extends base Promise with an "abort" method
|
||||
*/
|
||||
export interface TransportRequestPromise<T> extends Promise<T> {
|
||||
abort?: () => void;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -30,12 +36,13 @@ import { TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport';
|
|||
*
|
||||
* @returns a TransportRequestPromise that will be aborted if the signal is aborted
|
||||
*/
|
||||
|
||||
export const shimAbortSignal = <T extends TransportRequestPromise<unknown>>(
|
||||
promise: T,
|
||||
signal: AbortSignal | undefined
|
||||
): T => {
|
||||
if (signal) {
|
||||
signal.addEventListener('abort', () => promise.abort());
|
||||
signal.addEventListener('abort', () => promise.abort && promise.abort());
|
||||
}
|
||||
return promise;
|
||||
};
|
|
@ -19,6 +19,6 @@
|
|||
|
||||
import { mapKeys, snakeCase } from 'lodash';
|
||||
|
||||
export function toSnakeCase(obj: Record<string, any>) {
|
||||
export function toSnakeCase(obj: Record<string, any>): Record<string, any> {
|
||||
return mapKeys(obj, (value, key) => snakeCase(key));
|
||||
}
|
|
@ -46,4 +46,10 @@ export interface IEsSearchRequest extends IKibanaSearchRequest<ISearchRequestPar
|
|||
indexType?: string;
|
||||
}
|
||||
|
||||
export interface IEsRawSearchResponse<Source = any> extends SearchResponse<Source> {
|
||||
id?: string;
|
||||
is_partial?: boolean;
|
||||
is_running?: boolean;
|
||||
}
|
||||
|
||||
export type IEsSearchResponse<Source = any> = IKibanaSearchResponse<SearchResponse<Source>>;
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
import { IKibanaSearchResponse } from '..';
|
||||
import type { IKibanaSearchResponse } from '../types';
|
||||
|
||||
/**
|
||||
* @returns true if response had an error while executing in ES
|
||||
|
@ -30,12 +30,12 @@ export const isErrorResponse = (response?: IKibanaSearchResponse) => {
|
|||
* @returns true if response is completed successfully
|
||||
*/
|
||||
export const isCompleteResponse = (response?: IKibanaSearchResponse) => {
|
||||
return response && !response.isRunning && !response.isPartial;
|
||||
return Boolean(response && !response.isRunning && !response.isPartial);
|
||||
};
|
||||
|
||||
/**
|
||||
* @returns true if request is still running an/d response contains partial results
|
||||
*/
|
||||
export const isPartialResponse = (response?: IKibanaSearchResponse) => {
|
||||
return response && response.isRunning && response.isPartial;
|
||||
return Boolean(response && response.isRunning && response.isPartial);
|
||||
};
|
||||
|
|
|
@ -75,9 +75,9 @@ import { normalizeSortRequest } from './normalize_sort_request';
|
|||
import { filterDocvalueFields } from './filter_docvalue_fields';
|
||||
import { fieldWildcardFilter } from '../../../../kibana_utils/common';
|
||||
import { IIndexPattern } from '../../index_patterns';
|
||||
import { IEsSearchRequest, IEsSearchResponse, ISearchOptions } from '../..';
|
||||
import { IKibanaSearchRequest, IKibanaSearchResponse } from '../types';
|
||||
import { ISearchSource, SearchSourceOptions, SearchSourceFields } from './types';
|
||||
import { IEsSearchRequest, IEsSearchResponse, ISearchOptions } from '../../search';
|
||||
import type { IKibanaSearchRequest, IKibanaSearchResponse } from '../types';
|
||||
import type { ISearchSource, SearchSourceOptions, SearchSourceFields } from './types';
|
||||
import { FetchHandlers, RequestFailure, getSearchParamsFromRequest, SearchRequest } from './fetch';
|
||||
|
||||
import { getEsQueryConfig, buildEsQuery, Filter, UI_SETTINGS } from '../../../common';
|
||||
|
|
|
@ -18,7 +18,9 @@
|
|||
*/
|
||||
|
||||
import { NameList } from 'elasticsearch';
|
||||
import { Filter, IndexPattern, Query } from '../..';
|
||||
import { Query } from '../..';
|
||||
import { Filter } from '../../es_query';
|
||||
import { IndexPattern } from '../../index_patterns';
|
||||
import { SearchSource } from './search_source';
|
||||
|
||||
/**
|
||||
|
|
|
@ -1382,7 +1382,7 @@ export type InputTimeRange = TimeRange | {
|
|||
// Warning: (ae-missing-release-tag) "isCompleteResponse" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
|
||||
//
|
||||
// @public (undocumented)
|
||||
export const isCompleteResponse: (response?: IKibanaSearchResponse<any> | undefined) => boolean | undefined;
|
||||
export const isCompleteResponse: (response?: IKibanaSearchResponse<any> | undefined) => boolean;
|
||||
|
||||
// Warning: (ae-missing-release-tag) "ISearch" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
|
||||
//
|
||||
|
@ -1470,7 +1470,7 @@ export const isFilters: (x: unknown) => x is Filter[];
|
|||
// Warning: (ae-missing-release-tag) "isPartialResponse" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
|
||||
//
|
||||
// @public (undocumented)
|
||||
export const isPartialResponse: (response?: IKibanaSearchResponse<any> | undefined) => boolean | undefined;
|
||||
export const isPartialResponse: (response?: IKibanaSearchResponse<any> | undefined) => boolean;
|
||||
|
||||
// Warning: (ae-missing-release-tag) "isQuery" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
|
||||
//
|
||||
|
@ -2015,7 +2015,7 @@ export class SearchInterceptor {
|
|||
// @internal
|
||||
protected pendingCount$: BehaviorSubject<number>;
|
||||
// @internal (undocumented)
|
||||
protected runSearch(request: IKibanaSearchRequest, signal: AbortSignal, strategy?: string): Observable<IKibanaSearchResponse>;
|
||||
protected runSearch(request: IKibanaSearchRequest, signal: AbortSignal, strategy?: string): Promise<IKibanaSearchResponse>;
|
||||
search(request: IKibanaSearchRequest, options?: ISearchOptions): Observable<IKibanaSearchResponse>;
|
||||
// @internal (undocumented)
|
||||
protected setupAbortSignal({ abortSignal, timeout, }: {
|
||||
|
|
|
@ -129,18 +129,17 @@ export class SearchInterceptor {
|
|||
request: IKibanaSearchRequest,
|
||||
signal: AbortSignal,
|
||||
strategy?: string
|
||||
): Observable<IKibanaSearchResponse> {
|
||||
): Promise<IKibanaSearchResponse> {
|
||||
const { id, ...searchRequest } = request;
|
||||
const path = trimEnd(`/internal/search/${strategy || ES_SEARCH_STRATEGY}/${id || ''}`, '/');
|
||||
const body = JSON.stringify(searchRequest);
|
||||
return from(
|
||||
this.deps.http.fetch({
|
||||
method: 'POST',
|
||||
path,
|
||||
body,
|
||||
signal,
|
||||
})
|
||||
);
|
||||
|
||||
return this.deps.http.fetch({
|
||||
method: 'POST',
|
||||
path,
|
||||
body,
|
||||
signal,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -235,7 +234,7 @@ export class SearchInterceptor {
|
|||
abortSignal: options?.abortSignal,
|
||||
});
|
||||
this.pendingCount$.next(this.pendingCount$.getValue() + 1);
|
||||
return this.runSearch(request, combinedSignal, options?.strategy).pipe(
|
||||
return from(this.runSearch(request, combinedSignal, options?.strategy)).pipe(
|
||||
catchError((e: Error) => {
|
||||
return throwError(this.handleSearchError(e, request, timeoutSignal, options));
|
||||
}),
|
||||
|
|
|
@ -144,6 +144,7 @@ export {
|
|||
IndexPatternAttributes,
|
||||
UI_SETTINGS,
|
||||
IndexPattern,
|
||||
IEsRawSearchResponse,
|
||||
} from '../common';
|
||||
|
||||
/**
|
||||
|
@ -176,6 +177,13 @@ import {
|
|||
// tabify
|
||||
tabifyAggResponse,
|
||||
tabifyGetColumns,
|
||||
// search
|
||||
toSnakeCase,
|
||||
shimAbortSignal,
|
||||
doSearch,
|
||||
includeTotalLoaded,
|
||||
toKibanaSearchResponse,
|
||||
getTotalLoaded,
|
||||
} from '../common';
|
||||
|
||||
export {
|
||||
|
@ -213,19 +221,29 @@ export {
|
|||
ISearchStrategy,
|
||||
ISearchSetup,
|
||||
ISearchStart,
|
||||
toSnakeCase,
|
||||
getAsyncOptions,
|
||||
getDefaultSearchParams,
|
||||
getShardTimeout,
|
||||
getTotalLoaded,
|
||||
shimHitsTotal,
|
||||
usageProvider,
|
||||
shimAbortSignal,
|
||||
SearchUsage,
|
||||
} from './search';
|
||||
|
||||
import { trackSearchStatus } from './search';
|
||||
|
||||
// Search namespace
|
||||
export const search = {
|
||||
esSearch: {
|
||||
utils: {
|
||||
doSearch,
|
||||
shimAbortSignal,
|
||||
trackSearchStatus,
|
||||
includeTotalLoaded,
|
||||
toKibanaSearchResponse,
|
||||
// utils:
|
||||
getTotalLoaded,
|
||||
toSnakeCase,
|
||||
},
|
||||
},
|
||||
aggs: {
|
||||
CidrMask,
|
||||
dateHistogramInterval,
|
||||
|
|
|
@ -24,6 +24,7 @@ import {
|
|||
AggsCommonStart,
|
||||
getCalculateAutoTimeExpression,
|
||||
} from '../../../common';
|
||||
|
||||
import { AggsSetup, AggsStart } from './types';
|
||||
|
||||
import { mockAggTypesRegistry } from '../../../common/search/aggs/test_helpers';
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
import { pipe } from 'rxjs';
|
||||
import { tap } from 'rxjs/operators';
|
||||
|
||||
import type { Logger, SearchResponse } from 'kibana/server';
|
||||
import type { SearchUsage } from '../collectors';
|
||||
import type { IEsSearchResponse, IKibanaSearchResponse } from '../../../common/search';
|
||||
|
||||
/**
|
||||
* trackSearchStatus is a custom rxjs operator that can be used to track the progress of a search.
|
||||
* @param Logger
|
||||
* @param SearchUsage
|
||||
*/
|
||||
export const trackSearchStatus = <
|
||||
KibanaResponse extends IKibanaSearchResponse = IEsSearchResponse<SearchResponse<unknown>>
|
||||
>(
|
||||
logger: Logger,
|
||||
usage?: SearchUsage
|
||||
) => {
|
||||
return pipe(
|
||||
tap(
|
||||
(response: KibanaResponse) => {
|
||||
const trackSuccessData = response.rawResponse.took;
|
||||
|
||||
if (trackSuccessData !== undefined) {
|
||||
logger.debug(`trackSearchStatus:next ${trackSuccessData}`);
|
||||
usage?.trackSuccess(trackSuccessData);
|
||||
}
|
||||
},
|
||||
(err: any) => {
|
||||
logger.debug(`trackSearchStatus:error ${err}`);
|
||||
usage?.trackError();
|
||||
}
|
||||
)
|
||||
);
|
||||
};
|
|
@ -16,74 +16,46 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
import { Observable, from } from 'rxjs';
|
||||
import { Observable } from 'rxjs';
|
||||
import { first } from 'rxjs/operators';
|
||||
import { SharedGlobalConfig, Logger } from 'kibana/server';
|
||||
import { SearchResponse } from 'elasticsearch';
|
||||
import { ApiResponse } from '@elastic/elasticsearch';
|
||||
import { SearchUsage } from '../collectors/usage';
|
||||
import { toSnakeCase } from './to_snake_case';
|
||||
import {
|
||||
ISearchStrategy,
|
||||
getDefaultSearchParams,
|
||||
getTotalLoaded,
|
||||
getShardTimeout,
|
||||
shimAbortSignal,
|
||||
IEsSearchResponse,
|
||||
} from '..';
|
||||
|
||||
import type { Logger } from 'kibana/server';
|
||||
import type { ApiResponse } from '@elastic/elasticsearch';
|
||||
import type { SharedGlobalConfig } from 'kibana/server';
|
||||
|
||||
import { doSearch, includeTotalLoaded, toKibanaSearchResponse, toSnakeCase } from '../../../common';
|
||||
import { trackSearchStatus } from './es_search_rxjs_utils';
|
||||
import { getDefaultSearchParams, getShardTimeout } from '../es_search';
|
||||
|
||||
import type { ISearchStrategy } from '../types';
|
||||
import type { SearchUsage } from '../collectors/usage';
|
||||
import type { IEsRawSearchResponse } from '../../../common';
|
||||
|
||||
export const esSearchStrategyProvider = (
|
||||
config$: Observable<SharedGlobalConfig>,
|
||||
logger: Logger,
|
||||
usage?: SearchUsage
|
||||
): ISearchStrategy => {
|
||||
return {
|
||||
search: (request, options, context) =>
|
||||
from(
|
||||
new Promise<IEsSearchResponse>(async (resolve, reject) => {
|
||||
logger.debug(`search ${request.params?.index}`);
|
||||
const config = await config$.pipe(first()).toPromise();
|
||||
const uiSettingsClient = await context.core.uiSettings.client;
|
||||
): ISearchStrategy => ({
|
||||
search: (request, { abortSignal }, context) => {
|
||||
// Only default index pattern type is supported here.
|
||||
// See data_enhanced for other type support.
|
||||
if (request.indexType) {
|
||||
throw new Error(`Unsupported index pattern type ${request.indexType}`);
|
||||
}
|
||||
|
||||
// Only default index pattern type is supported here.
|
||||
// See data_enhanced for other type support.
|
||||
if (!!request.indexType) {
|
||||
throw new Error(`Unsupported index pattern type ${request.indexType}`);
|
||||
}
|
||||
return doSearch<ApiResponse<IEsRawSearchResponse>>(async () => {
|
||||
const config = await config$.pipe(first()).toPromise();
|
||||
const params = toSnakeCase({
|
||||
...(await getDefaultSearchParams(context.core.uiSettings.client)),
|
||||
...getShardTimeout(config),
|
||||
...request.params,
|
||||
});
|
||||
|
||||
// ignoreThrottled is not supported in OSS
|
||||
const { ignoreThrottled, ...defaultParams } = await getDefaultSearchParams(
|
||||
uiSettingsClient
|
||||
);
|
||||
|
||||
const params = toSnakeCase({
|
||||
...defaultParams,
|
||||
...getShardTimeout(config),
|
||||
...request.params,
|
||||
});
|
||||
|
||||
try {
|
||||
const promise = shimAbortSignal(
|
||||
context.core.elasticsearch.client.asCurrentUser.search(params),
|
||||
options?.abortSignal
|
||||
);
|
||||
const { body: rawResponse } = (await promise) as ApiResponse<SearchResponse<any>>;
|
||||
|
||||
if (usage) usage.trackSuccess(rawResponse.took);
|
||||
|
||||
// The above query will either complete or timeout and throw an error.
|
||||
// There is no progress indication on this api.
|
||||
resolve({
|
||||
isPartial: false,
|
||||
isRunning: false,
|
||||
rawResponse,
|
||||
...getTotalLoaded(rawResponse._shards),
|
||||
});
|
||||
} catch (e) {
|
||||
if (usage) usage.trackError();
|
||||
reject(e);
|
||||
}
|
||||
})
|
||||
),
|
||||
};
|
||||
};
|
||||
return context.core.elasticsearch.client.asCurrentUser.search(params);
|
||||
}, abortSignal).pipe(
|
||||
toKibanaSearchResponse(),
|
||||
trackSearchStatus(logger, usage),
|
||||
includeTotalLoaded()
|
||||
);
|
||||
},
|
||||
});
|
||||
|
|
|
@ -16,9 +16,8 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
import { SharedGlobalConfig, IUiSettingsClient } from '../../../../../core/server';
|
||||
import { UI_SETTINGS } from '../../../common/constants';
|
||||
import type { SharedGlobalConfig, IUiSettingsClient } from '../../../../../core/server';
|
||||
|
||||
export function getShardTimeout(config: SharedGlobalConfig) {
|
||||
const timeout = config.elasticsearch.shardTimeout.asMilliseconds();
|
||||
|
@ -30,23 +29,13 @@ export function getShardTimeout(config: SharedGlobalConfig) {
|
|||
}
|
||||
|
||||
export async function getDefaultSearchParams(uiSettingsClient: IUiSettingsClient) {
|
||||
const ignoreThrottled = !(await uiSettingsClient.get(UI_SETTINGS.SEARCH_INCLUDE_FROZEN));
|
||||
const maxConcurrentShardRequests = await uiSettingsClient.get<number>(
|
||||
UI_SETTINGS.COURIER_MAX_CONCURRENT_SHARD_REQUESTS
|
||||
);
|
||||
return {
|
||||
maxConcurrentShardRequests:
|
||||
maxConcurrentShardRequests > 0 ? maxConcurrentShardRequests : undefined,
|
||||
ignoreThrottled,
|
||||
ignoreUnavailable: true, // Don't fail if the index/indices don't exist
|
||||
trackTotalHits: true,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
@internal
|
||||
*/
|
||||
export const getAsyncOptions = () => ({
|
||||
waitForCompletionTimeout: '100ms', // Wait up to 100ms for the response to return
|
||||
keepAlive: '1m', // Extend the TTL for this search request by one minute
|
||||
});
|
||||
|
|
|
@ -19,8 +19,6 @@
|
|||
|
||||
export { esSearchStrategyProvider } from './es_search_strategy';
|
||||
export * from './get_default_search_params';
|
||||
export { getTotalLoaded } from './get_total_loaded';
|
||||
export * from './to_snake_case';
|
||||
export { shimAbortSignal } from './shim_abort_signal';
|
||||
export * from './es_search_rxjs_utils';
|
||||
|
||||
export { ES_SEARCH_STRATEGY, IEsSearchRequest, IEsSearchResponse } from '../../../common';
|
||||
|
|
|
@ -61,7 +61,6 @@ describe('callMsearch', () => {
|
|||
},
|
||||
Object {
|
||||
"querystring": Object {
|
||||
"ignore_throttled": true,
|
||||
"ignore_unavailable": true,
|
||||
"max_concurrent_shard_requests": undefined,
|
||||
},
|
||||
|
|
|
@ -23,9 +23,10 @@ import { ApiResponse } from '@elastic/elasticsearch';
|
|||
import { SearchResponse } from 'elasticsearch';
|
||||
import { IUiSettingsClient, IScopedClusterClient, SharedGlobalConfig } from 'src/core/server';
|
||||
|
||||
import { MsearchRequestBody, MsearchResponse } from '../../../common/search/search_source';
|
||||
import type { MsearchRequestBody, MsearchResponse } from '../../../common/search/search_source';
|
||||
import { toSnakeCase, shimAbortSignal } from '../../../common/search/es_search';
|
||||
import { shimHitsTotal } from './shim_hits_total';
|
||||
import { getShardTimeout, getDefaultSearchParams, toSnakeCase, shimAbortSignal } from '..';
|
||||
import { getShardTimeout, getDefaultSearchParams } from '..';
|
||||
|
||||
/** @internal */
|
||||
export function convertRequestBody(
|
||||
|
|
|
@ -76,7 +76,6 @@ describe('msearch route', () => {
|
|||
);
|
||||
expect(mockClient.msearch.mock.calls[0][1].querystring).toMatchInlineSnapshot(`
|
||||
Object {
|
||||
"ignore_throttled": true,
|
||||
"ignore_unavailable": true,
|
||||
"max_concurrent_shard_requests": undefined,
|
||||
}
|
||||
|
|
|
@ -17,10 +17,11 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
import { first } from 'rxjs/operators';
|
||||
import { schema } from '@kbn/config-schema';
|
||||
import { IRouter } from 'src/core/server';
|
||||
import type { IRouter } from 'src/core/server';
|
||||
import { getRequestAbortedSignal } from '../../lib';
|
||||
import { SearchRouteDependencies } from '../search_service';
|
||||
import type { SearchRouteDependencies } from '../search_service';
|
||||
import { shimHitsTotal } from './shim_hits_total';
|
||||
|
||||
export function registerSearchRoute(
|
||||
|
@ -58,6 +59,7 @@ export function registerSearchRoute(
|
|||
},
|
||||
context
|
||||
)
|
||||
.pipe(first())
|
||||
.toPromise();
|
||||
|
||||
return res.ok({
|
||||
|
|
|
@ -51,7 +51,6 @@ import { SearchResponse } from 'elasticsearch';
|
|||
import { SerializedFieldFormat as SerializedFieldFormat_2 } from 'src/plugins/expressions/common';
|
||||
import { ShardsResponse } from 'elasticsearch';
|
||||
import { ToastInputFields } from 'src/core/public/notifications';
|
||||
import { TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport';
|
||||
import { Type } from '@kbn/config-schema';
|
||||
import { TypeOf } from '@kbn/config-schema';
|
||||
import { Unit } from '@elastic/datemath';
|
||||
|
@ -361,19 +360,12 @@ export type Filter = {
|
|||
query?: any;
|
||||
};
|
||||
|
||||
// @internal (undocumented)
|
||||
export const getAsyncOptions: () => {
|
||||
waitForCompletionTimeout: string;
|
||||
keepAlive: string;
|
||||
};
|
||||
|
||||
// Warning: (ae-forgotten-export) The symbol "IUiSettingsClient" needs to be exported by the entry point index.d.ts
|
||||
// Warning: (ae-missing-release-tag) "getDefaultSearchParams" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
|
||||
//
|
||||
// @public (undocumented)
|
||||
export function getDefaultSearchParams(uiSettingsClient: IUiSettingsClient): Promise<{
|
||||
maxConcurrentShardRequests: number | undefined;
|
||||
ignoreThrottled: boolean;
|
||||
ignoreUnavailable: boolean;
|
||||
trackTotalHits: boolean;
|
||||
}>;
|
||||
|
@ -397,12 +389,6 @@ export function getTime(indexPattern: IIndexPattern | undefined, timeRange: Time
|
|||
fieldName?: string;
|
||||
}): import("../..").RangeFilter | undefined;
|
||||
|
||||
// @internal
|
||||
export function getTotalLoaded({ total, failed, successful }: ShardsResponse): {
|
||||
total: number;
|
||||
loaded: number;
|
||||
};
|
||||
|
||||
// Warning: (ae-missing-release-tag) "IAggConfig" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
|
||||
//
|
||||
// @public
|
||||
|
@ -419,6 +405,18 @@ export type IAggConfigs = AggConfigs;
|
|||
// @public (undocumented)
|
||||
export type IAggType = AggType;
|
||||
|
||||
// Warning: (ae-missing-release-tag) "IEsRawSearchResponse" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
|
||||
//
|
||||
// @public (undocumented)
|
||||
export interface IEsRawSearchResponse<Source = any> extends SearchResponse<Source> {
|
||||
// (undocumented)
|
||||
id?: string;
|
||||
// (undocumented)
|
||||
is_partial?: boolean;
|
||||
// (undocumented)
|
||||
is_running?: boolean;
|
||||
}
|
||||
|
||||
// Warning: (ae-forgotten-export) The symbol "IKibanaSearchRequest" needs to be exported by the entry point index.d.ts
|
||||
// Warning: (ae-forgotten-export) The symbol "ISearchRequestParams" needs to be exported by the entry point index.d.ts
|
||||
// Warning: (ae-missing-release-tag) "IEsSearchRequest" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
|
||||
|
@ -958,6 +956,24 @@ export interface RefreshInterval {
|
|||
//
|
||||
// @public (undocumented)
|
||||
export const search: {
|
||||
esSearch: {
|
||||
utils: {
|
||||
doSearch: <SearchResponse = any>(searchMethod: () => Promise<SearchResponse>, abortSignal?: AbortSignal | undefined) => import("rxjs").Observable<SearchResponse>;
|
||||
shimAbortSignal: <T extends import("../common").TransportRequestPromise<unknown>>(promise: T, signal: AbortSignal | undefined) => T;
|
||||
trackSearchStatus: <KibanaResponse extends import("../common").IKibanaSearchResponse<any> = import("./search").IEsSearchResponse<import("../../../core/server").SearchResponse<unknown>>>(logger: import("@kbn/logging/target/logger").Logger, usage?: import("./search").SearchUsage | undefined) => import("rxjs").UnaryFunction<import("rxjs").Observable<KibanaResponse>, import("rxjs").Observable<KibanaResponse>>;
|
||||
includeTotalLoaded: () => import("rxjs").OperatorFunction<import("../common").IKibanaSearchResponse<import("elasticsearch").SearchResponse<unknown>>, {
|
||||
total: number;
|
||||
loaded: number;
|
||||
id?: string | undefined;
|
||||
isRunning?: boolean | undefined;
|
||||
isPartial?: boolean | undefined;
|
||||
rawResponse: import("elasticsearch").SearchResponse<unknown>;
|
||||
}>;
|
||||
toKibanaSearchResponse: <SearchResponse_1 extends import("../common").IEsRawSearchResponse<any> = import("../common").IEsRawSearchResponse<any>, KibanaResponse_1 extends import("../common").IKibanaSearchResponse<any> = import("../common").IKibanaSearchResponse<SearchResponse_1>>() => import("rxjs").OperatorFunction<import("@elastic/elasticsearch").ApiResponse<SearchResponse_1, import("@elastic/elasticsearch/lib/Transport").Context>, KibanaResponse_1>;
|
||||
getTotalLoaded: typeof getTotalLoaded;
|
||||
toSnakeCase: typeof toSnakeCase;
|
||||
};
|
||||
};
|
||||
aggs: {
|
||||
CidrMask: typeof CidrMask;
|
||||
dateHistogramInterval: typeof dateHistogramInterval;
|
||||
|
@ -1001,9 +1017,6 @@ export interface SearchUsage {
|
|||
trackSuccess(duration: number): Promise<void>;
|
||||
}
|
||||
|
||||
// @internal
|
||||
export const shimAbortSignal: <T extends TransportRequestPromise<unknown>>(promise: T, signal: AbortSignal | undefined) => T;
|
||||
|
||||
// @internal
|
||||
export function shimHitsTotal(response: SearchResponse<any>): {
|
||||
hits: {
|
||||
|
@ -1066,11 +1079,6 @@ export type TimeRange = {
|
|||
mode?: 'absolute' | 'relative';
|
||||
};
|
||||
|
||||
// Warning: (ae-missing-release-tag) "toSnakeCase" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
|
||||
//
|
||||
// @public (undocumented)
|
||||
export function toSnakeCase(obj: Record<string, any>): import("lodash").Dictionary<any>;
|
||||
|
||||
// Warning: (ae-missing-release-tag) "UI_SETTINGS" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
|
||||
//
|
||||
// @public (undocumented)
|
||||
|
@ -1139,19 +1147,21 @@ export function usageProvider(core: CoreSetup_2): SearchUsage;
|
|||
// src/plugins/data/server/index.ts:101:26 - (ae-forgotten-export) The symbol "TruncateFormat" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:127:27 - (ae-forgotten-export) The symbol "isFilterable" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:127:27 - (ae-forgotten-export) The symbol "isNestedField" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:228:20 - (ae-forgotten-export) The symbol "getRequestInspectorStats" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:228:20 - (ae-forgotten-export) The symbol "getResponseInspectorStats" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:228:20 - (ae-forgotten-export) The symbol "tabifyAggResponse" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:228:20 - (ae-forgotten-export) The symbol "tabifyGetColumns" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:230:1 - (ae-forgotten-export) The symbol "CidrMask" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:231:1 - (ae-forgotten-export) The symbol "dateHistogramInterval" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:240:1 - (ae-forgotten-export) The symbol "InvalidEsCalendarIntervalError" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:241:1 - (ae-forgotten-export) The symbol "InvalidEsIntervalFormatError" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:242:1 - (ae-forgotten-export) The symbol "Ipv4Address" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:246:1 - (ae-forgotten-export) The symbol "isValidEsInterval" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:247:1 - (ae-forgotten-export) The symbol "isValidInterval" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:251:1 - (ae-forgotten-export) The symbol "propFilter" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:254:1 - (ae-forgotten-export) The symbol "toAbsoluteDates" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:234:20 - (ae-forgotten-export) The symbol "getRequestInspectorStats" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:234:20 - (ae-forgotten-export) The symbol "getResponseInspectorStats" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:234:20 - (ae-forgotten-export) The symbol "tabifyAggResponse" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:234:20 - (ae-forgotten-export) The symbol "tabifyGetColumns" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:249:5 - (ae-forgotten-export) The symbol "getTotalLoaded" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:250:5 - (ae-forgotten-export) The symbol "toSnakeCase" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:254:1 - (ae-forgotten-export) The symbol "CidrMask" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:255:1 - (ae-forgotten-export) The symbol "dateHistogramInterval" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:264:1 - (ae-forgotten-export) The symbol "InvalidEsCalendarIntervalError" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:265:1 - (ae-forgotten-export) The symbol "InvalidEsIntervalFormatError" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:266:1 - (ae-forgotten-export) The symbol "Ipv4Address" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:270:1 - (ae-forgotten-export) The symbol "isValidEsInterval" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:271:1 - (ae-forgotten-export) The symbol "isValidInterval" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:275:1 - (ae-forgotten-export) The symbol "propFilter" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index.ts:278:1 - (ae-forgotten-export) The symbol "toAbsoluteDates" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/index_patterns/index_patterns_service.ts:50:14 - (ae-forgotten-export) The symbol "IndexPatternsService" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/plugin.ts:88:66 - (ae-forgotten-export) The symbol "DataEnhancements" needs to be exported by the entry point index.d.ts
|
||||
// src/plugins/data/server/search/types.ts:91:5 - (ae-forgotten-export) The symbol "ISearchStartSearchSource" needs to be exported by the entry point index.d.ts
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
import _ from 'lodash';
|
||||
import { IRouter, CoreSetup } from 'kibana/server';
|
||||
import { ES_SEARCH_STRATEGY } from '../../../data/server';
|
||||
import { TimelionPluginStartDeps } from '../plugin';
|
||||
|
||||
export function validateEsRoute(router: IRouter, core: CoreSetup) {
|
||||
|
@ -57,17 +56,7 @@ export function validateEsRoute(router: IRouter, core: CoreSetup) {
|
|||
|
||||
let resp;
|
||||
try {
|
||||
resp = (
|
||||
await deps.data.search
|
||||
.search(
|
||||
body,
|
||||
{
|
||||
strategy: ES_SEARCH_STRATEGY,
|
||||
},
|
||||
context
|
||||
)
|
||||
.toPromise()
|
||||
).rawResponse;
|
||||
resp = (await deps.data.search.search(body, {}, context).toPromise()).rawResponse;
|
||||
} catch (errResp) {
|
||||
resp = errResp;
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
import { i18n } from '@kbn/i18n';
|
||||
import _ from 'lodash';
|
||||
import { ES_SEARCH_STRATEGY } from '../../../../data/server';
|
||||
import Datasource from '../../lib/classes/datasource';
|
||||
import buildRequest from './lib/build_request';
|
||||
import toSeriesList from './lib/agg_response_to_series_list';
|
||||
|
@ -135,7 +134,6 @@ export default new Datasource('es', {
|
|||
.search(
|
||||
body,
|
||||
{
|
||||
strategy: ES_SEARCH_STRATEGY,
|
||||
sessionId: tlConfig.request?.body.sessionId,
|
||||
},
|
||||
tlConfig.context
|
||||
|
|
|
@ -35,7 +35,7 @@ describe('AbstractSearchStrategy', () => {
|
|||
},
|
||||
};
|
||||
|
||||
abstractSearchStrategy = new AbstractSearchStrategy('es');
|
||||
abstractSearchStrategy = new AbstractSearchStrategy();
|
||||
});
|
||||
|
||||
test('should init an AbstractSearchStrategy instance', () => {
|
||||
|
@ -90,9 +90,7 @@ describe('AbstractSearchStrategy', () => {
|
|||
},
|
||||
indexType: undefined,
|
||||
},
|
||||
{
|
||||
strategy: 'es',
|
||||
},
|
||||
{},
|
||||
{}
|
||||
);
|
||||
});
|
||||
|
|
|
@ -45,12 +45,10 @@ export type ReqFacade = FakeRequest & {
|
|||
};
|
||||
|
||||
export class AbstractSearchStrategy {
|
||||
public searchStrategyName!: string;
|
||||
public indexType?: string;
|
||||
public additionalParams: any;
|
||||
|
||||
constructor(name: string, type?: string, additionalParams: any = {}) {
|
||||
this.searchStrategyName = name;
|
||||
constructor(type?: string, additionalParams: any = {}) {
|
||||
this.indexType = type;
|
||||
this.additionalParams = additionalParams;
|
||||
}
|
||||
|
@ -71,7 +69,6 @@ export class AbstractSearchStrategy {
|
|||
},
|
||||
{
|
||||
...options,
|
||||
strategy: this.searchStrategyName,
|
||||
},
|
||||
req.requestContext
|
||||
)
|
||||
|
|
|
@ -17,17 +17,12 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
import { ES_SEARCH_STRATEGY } from '../../../../../data/server';
|
||||
import { AbstractSearchStrategy } from './abstract_search_strategy';
|
||||
import { DefaultSearchCapabilities } from '../default_search_capabilities';
|
||||
|
||||
export class DefaultSearchStrategy extends AbstractSearchStrategy {
|
||||
name = 'default';
|
||||
|
||||
constructor() {
|
||||
super(ES_SEARCH_STRATEGY);
|
||||
}
|
||||
|
||||
checkForViability(req) {
|
||||
return {
|
||||
isViable: true,
|
||||
|
|
|
@ -12,4 +12,7 @@ export {
|
|||
EqlSearchStrategyResponse,
|
||||
IAsyncSearchRequest,
|
||||
IEnhancedEsSearchRequest,
|
||||
IAsyncSearchOptions,
|
||||
doPartialSearch,
|
||||
throwOnEsError,
|
||||
} from './search';
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { of, merge, timer, throwError } from 'rxjs';
|
||||
import { takeWhile, switchMap, expand, mergeMap, tap } from 'rxjs/operators';
|
||||
|
||||
import {
|
||||
AbortError,
|
||||
doSearch,
|
||||
IKibanaSearchResponse,
|
||||
isErrorResponse,
|
||||
} from '../../../../../../src/plugins/data/common';
|
||||
import type { IKibanaSearchRequest } from '../../../../../../src/plugins/data/common';
|
||||
import type { IAsyncSearchOptions } from '../../../common/search/types';
|
||||
|
||||
const DEFAULT_POLLING_INTERVAL = 1000;
|
||||
|
||||
export const doPartialSearch = <SearchResponse = any>(
|
||||
searchMethod: () => Promise<SearchResponse>,
|
||||
partialSearchMethod: (id: IKibanaSearchRequest['id']) => Promise<SearchResponse>,
|
||||
isCompleteResponse: (response: SearchResponse) => boolean,
|
||||
getId: (response: SearchResponse) => IKibanaSearchRequest['id'],
|
||||
requestId: IKibanaSearchRequest['id'],
|
||||
{ abortSignal, pollInterval = DEFAULT_POLLING_INTERVAL }: IAsyncSearchOptions
|
||||
) =>
|
||||
doSearch<SearchResponse>(
|
||||
requestId ? () => partialSearchMethod(requestId) : searchMethod,
|
||||
abortSignal
|
||||
).pipe(
|
||||
tap((response) => (requestId = getId(response))),
|
||||
expand(() => timer(pollInterval).pipe(switchMap(() => partialSearchMethod(requestId)))),
|
||||
takeWhile((response) => !isCompleteResponse(response), true)
|
||||
);
|
||||
|
||||
export const throwOnEsError = () =>
|
||||
mergeMap((r: IKibanaSearchResponse) =>
|
||||
isErrorResponse(r) ? merge(of(r), throwError(new AbortError())) : of(r)
|
||||
);
|
|
@ -4,4 +4,4 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
export { IAsyncSearchOptions } from './types';
|
||||
export * from './es_search_rxjs_utils';
|
|
@ -5,3 +5,4 @@
|
|||
*/
|
||||
|
||||
export * from './types';
|
||||
export * from './es_search';
|
||||
|
|
|
@ -8,6 +8,7 @@ import { EqlSearch } from '@elastic/elasticsearch/api/requestParams';
|
|||
import { ApiResponse, TransportRequestOptions } from '@elastic/elasticsearch/lib/Transport';
|
||||
|
||||
import {
|
||||
ISearchOptions,
|
||||
IEsSearchRequest,
|
||||
IKibanaSearchRequest,
|
||||
IKibanaSearchResponse,
|
||||
|
@ -38,3 +39,10 @@ export interface EqlSearchStrategyRequest extends IKibanaSearchRequest<EqlReques
|
|||
}
|
||||
|
||||
export type EqlSearchStrategyResponse<T = unknown> = IKibanaSearchResponse<ApiResponse<T>>;
|
||||
|
||||
export interface IAsyncSearchOptions extends ISearchOptions {
|
||||
/**
|
||||
* The number of milliseconds to wait between receiving a response and sending another request
|
||||
*/
|
||||
pollInterval?: number;
|
||||
}
|
||||
|
|
|
@ -4,18 +4,24 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { throwError, EMPTY, timer, from, Subscription } from 'rxjs';
|
||||
import { mergeMap, expand, takeUntil, finalize, catchError } from 'rxjs/operators';
|
||||
import { throwError, from, Subscription } from 'rxjs';
|
||||
import { tap, takeUntil, finalize, catchError } from 'rxjs/operators';
|
||||
import {
|
||||
TimeoutErrorMode,
|
||||
IEsSearchResponse,
|
||||
SearchInterceptor,
|
||||
SearchInterceptorDeps,
|
||||
UI_SETTINGS,
|
||||
} from '../../../../../src/plugins/data/public';
|
||||
import { isErrorResponse, isCompleteResponse } from '../../../../../src/plugins/data/public';
|
||||
import { AbortError, toPromise } from '../../../../../src/plugins/data/common';
|
||||
import { TimeoutErrorMode } from '../../../../../src/plugins/data/public';
|
||||
import { IAsyncSearchOptions } from '.';
|
||||
import { IAsyncSearchRequest, ENHANCED_ES_SEARCH_STRATEGY } from '../../common';
|
||||
|
||||
import {
|
||||
IAsyncSearchRequest,
|
||||
ENHANCED_ES_SEARCH_STRATEGY,
|
||||
IAsyncSearchOptions,
|
||||
doPartialSearch,
|
||||
throwOnEsError,
|
||||
} from '../../common';
|
||||
|
||||
export class EnhancedSearchInterceptor extends SearchInterceptor {
|
||||
private uiSettingsSub: Subscription;
|
||||
|
@ -69,35 +75,24 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
|
|||
|
||||
this.pendingCount$.next(this.pendingCount$.getValue() + 1);
|
||||
|
||||
return this.runSearch(request, combinedSignal, strategy).pipe(
|
||||
expand((response) => {
|
||||
// If the response indicates of an error, stop polling and complete the observable
|
||||
if (isErrorResponse(response)) {
|
||||
return throwError(new AbortError());
|
||||
}
|
||||
|
||||
// If the response indicates it is complete, stop polling and complete the observable
|
||||
if (isCompleteResponse(response)) {
|
||||
return EMPTY;
|
||||
}
|
||||
|
||||
id = response.id;
|
||||
// Delay by the given poll interval
|
||||
return timer(pollInterval).pipe(
|
||||
// Send future requests using just the ID from the response
|
||||
mergeMap(() => {
|
||||
return this.runSearch({ ...request, id }, combinedSignal, strategy);
|
||||
})
|
||||
);
|
||||
return doPartialSearch<IEsSearchResponse>(
|
||||
() => this.runSearch(request, combinedSignal, strategy),
|
||||
(requestId) => this.runSearch({ ...request, id: requestId }, combinedSignal, strategy),
|
||||
(r) => !r.isRunning,
|
||||
(response) => response.id,
|
||||
id,
|
||||
{ pollInterval }
|
||||
).pipe(
|
||||
tap((r) => {
|
||||
id = r.id ?? id;
|
||||
}),
|
||||
throwOnEsError(),
|
||||
takeUntil(from(abortedPromise.promise)),
|
||||
catchError((e: any) => {
|
||||
// If we haven't received the response to the initial request, including the ID, then
|
||||
// we don't need to send a follow-up request to delete this search. Otherwise, we
|
||||
// send the follow-up request to delete this search, then throw an abort error.
|
||||
if (id !== undefined) {
|
||||
catchError((e: AbortError) => {
|
||||
if (id) {
|
||||
this.deps.http.delete(`/internal/search/${strategy}/${id}`);
|
||||
}
|
||||
|
||||
return throwError(this.handleSearchError(e, request, timeoutSignal, options));
|
||||
}),
|
||||
finalize(() => {
|
||||
|
|
|
@ -1,14 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { ISearchOptions } from '../../../../../src/plugins/data/public';
|
||||
|
||||
export interface IAsyncSearchOptions extends ISearchOptions {
|
||||
/**
|
||||
* The number of milliseconds to wait between receiving a response and sending another request
|
||||
*/
|
||||
pollInterval?: number;
|
||||
}
|
|
@ -3,8 +3,8 @@
|
|||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
import type { RequestHandlerContext, Logger } from 'kibana/server';
|
||||
|
||||
import { Logger, RequestHandlerContext } from 'src/core/server';
|
||||
import { EqlSearchStrategyRequest } from '../../common/search/types';
|
||||
import { eqlSearchStrategyProvider } from './eql_search_strategy';
|
||||
|
||||
|
|
|
@ -4,18 +4,18 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { from } from 'rxjs';
|
||||
import { Logger } from 'kibana/server';
|
||||
import { ApiResponse, TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport';
|
||||
import type { Logger } from 'kibana/server';
|
||||
import type { ApiResponse } from '@elastic/elasticsearch';
|
||||
|
||||
import {
|
||||
getAsyncOptions,
|
||||
getDefaultSearchParams,
|
||||
ISearchStrategy,
|
||||
toSnakeCase,
|
||||
shimAbortSignal,
|
||||
} from '../../../../../src/plugins/data/server';
|
||||
import { EqlSearchStrategyRequest, EqlSearchStrategyResponse } from '../../common/search/types';
|
||||
import { search } from '../../../../../src/plugins/data/server';
|
||||
import { doPartialSearch } from '../../common/search/es_search/es_search_rxjs_utils';
|
||||
import { getAsyncOptions, getDefaultSearchParams } from './get_default_search_params';
|
||||
|
||||
import type { ISearchStrategy, IEsRawSearchResponse } from '../../../../../src/plugins/data/server';
|
||||
import type {
|
||||
EqlSearchStrategyRequest,
|
||||
EqlSearchStrategyResponse,
|
||||
} from '../../common/search/types';
|
||||
|
||||
export const eqlSearchStrategyProvider = (
|
||||
logger: Logger
|
||||
|
@ -27,56 +27,44 @@ export const eqlSearchStrategyProvider = (
|
|||
id,
|
||||
});
|
||||
},
|
||||
search: (request, options, context) =>
|
||||
from(
|
||||
new Promise<EqlSearchStrategyResponse>(async (resolve, reject) => {
|
||||
logger.debug(`_eql/search ${JSON.stringify(request.params) || request.id}`);
|
||||
let promise: TransportRequestPromise<ApiResponse>;
|
||||
|
||||
try {
|
||||
const eqlClient = context.core.elasticsearch.client.asCurrentUser.eql;
|
||||
const uiSettingsClient = await context.core.uiSettings.client;
|
||||
const asyncOptions = getAsyncOptions();
|
||||
const searchOptions = toSnakeCase({ ...request.options });
|
||||
search: (request, options, context) => {
|
||||
logger.debug(`_eql/search ${JSON.stringify(request.params) || request.id}`);
|
||||
|
||||
if (request.id) {
|
||||
promise = eqlClient.get(
|
||||
{
|
||||
id: request.id,
|
||||
...toSnakeCase(asyncOptions),
|
||||
},
|
||||
searchOptions
|
||||
);
|
||||
} else {
|
||||
const { ignoreThrottled, ignoreUnavailable } = await getDefaultSearchParams(
|
||||
uiSettingsClient
|
||||
);
|
||||
const searchParams = toSnakeCase({
|
||||
ignoreThrottled,
|
||||
ignoreUnavailable,
|
||||
...asyncOptions,
|
||||
...request.params,
|
||||
});
|
||||
const { utils } = search.esSearch;
|
||||
const asyncOptions = getAsyncOptions();
|
||||
const requestOptions = utils.toSnakeCase({ ...request.options });
|
||||
const client = context.core.elasticsearch.client.asCurrentUser.eql;
|
||||
|
||||
promise = eqlClient.search(
|
||||
searchParams as EqlSearchStrategyRequest['params'],
|
||||
searchOptions
|
||||
);
|
||||
}
|
||||
return doPartialSearch<ApiResponse<IEsRawSearchResponse>>(
|
||||
async () => {
|
||||
const { ignoreThrottled, ignoreUnavailable } = await getDefaultSearchParams(
|
||||
context.core.uiSettings.client
|
||||
);
|
||||
|
||||
const rawResponse = await shimAbortSignal(promise, options?.abortSignal);
|
||||
const { id, is_partial: isPartial, is_running: isRunning } = rawResponse.body;
|
||||
|
||||
resolve({
|
||||
id,
|
||||
isPartial,
|
||||
isRunning,
|
||||
rawResponse,
|
||||
});
|
||||
} catch (e) {
|
||||
reject(e);
|
||||
}
|
||||
})
|
||||
),
|
||||
return client.search(
|
||||
utils.toSnakeCase({
|
||||
ignoreThrottled,
|
||||
ignoreUnavailable,
|
||||
...asyncOptions,
|
||||
...request.params,
|
||||
}) as EqlSearchStrategyRequest['params'],
|
||||
requestOptions
|
||||
);
|
||||
},
|
||||
(id) =>
|
||||
client.get(
|
||||
{
|
||||
id: id!,
|
||||
...utils.toSnakeCase(asyncOptions),
|
||||
},
|
||||
requestOptions
|
||||
),
|
||||
(response) => !response.body.is_running,
|
||||
(response) => response.body.id,
|
||||
request.id,
|
||||
options
|
||||
).pipe(utils.toKibanaSearchResponse());
|
||||
},
|
||||
};
|
||||
};
|
||||
|
|
|
@ -5,31 +5,36 @@
|
|||
*/
|
||||
|
||||
import { from } from 'rxjs';
|
||||
import { first } from 'rxjs/operators';
|
||||
import { SearchResponse } from 'elasticsearch';
|
||||
import { first, map } from 'rxjs/operators';
|
||||
import { Observable } from 'rxjs';
|
||||
import { TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport';
|
||||
import { SharedGlobalConfig, RequestHandlerContext, Logger } from '../../../../../src/core/server';
|
||||
import {
|
||||
getTotalLoaded,
|
||||
|
||||
import type { SearchResponse } from 'elasticsearch';
|
||||
import type { ApiResponse } from '@elastic/elasticsearch';
|
||||
|
||||
import { getShardTimeout, shimHitsTotal, search } from '../../../../../src/plugins/data/server';
|
||||
import { doPartialSearch } from '../../common/search/es_search/es_search_rxjs_utils';
|
||||
import { getDefaultSearchParams, getAsyncOptions } from './get_default_search_params';
|
||||
|
||||
import type {
|
||||
SharedGlobalConfig,
|
||||
RequestHandlerContext,
|
||||
Logger,
|
||||
} from '../../../../../src/core/server';
|
||||
|
||||
import type {
|
||||
ISearchStrategy,
|
||||
SearchUsage,
|
||||
getDefaultSearchParams,
|
||||
getShardTimeout,
|
||||
toSnakeCase,
|
||||
shimHitsTotal,
|
||||
getAsyncOptions,
|
||||
shimAbortSignal,
|
||||
} from '../../../../../src/plugins/data/server';
|
||||
import { IEnhancedEsSearchRequest } from '../../common';
|
||||
import {
|
||||
IEsRawSearchResponse,
|
||||
ISearchOptions,
|
||||
IEsSearchResponse,
|
||||
isCompleteResponse,
|
||||
} from '../../../../../src/plugins/data/common/search';
|
||||
} from '../../../../../src/plugins/data/server';
|
||||
|
||||
function isEnhancedEsSearchResponse(response: any): response is IEsSearchResponse {
|
||||
return response.hasOwnProperty('isPartial') && response.hasOwnProperty('isRunning');
|
||||
import type { IEnhancedEsSearchRequest } from '../../common';
|
||||
|
||||
const { utils } = search.esSearch;
|
||||
|
||||
interface IEsRawAsyncSearchResponse<Source = any> extends IEsRawSearchResponse<Source> {
|
||||
response: SearchResponse<Source>;
|
||||
}
|
||||
|
||||
export const enhancedEsSearchStrategyProvider = (
|
||||
|
@ -37,82 +42,42 @@ export const enhancedEsSearchStrategyProvider = (
|
|||
logger: Logger,
|
||||
usage?: SearchUsage
|
||||
): ISearchStrategy => {
|
||||
const search = (
|
||||
function asyncSearch(
|
||||
request: IEnhancedEsSearchRequest,
|
||||
options: ISearchOptions,
|
||||
context: RequestHandlerContext
|
||||
) =>
|
||||
from(
|
||||
new Promise<IEsSearchResponse>(async (resolve, reject) => {
|
||||
logger.debug(`search ${JSON.stringify(request.params) || request.id}`);
|
||||
|
||||
const isAsync = request.indexType !== 'rollup';
|
||||
|
||||
try {
|
||||
const response = isAsync
|
||||
? await asyncSearch(request, options, context)
|
||||
: await rollupSearch(request, options, context);
|
||||
|
||||
if (
|
||||
usage &&
|
||||
isAsync &&
|
||||
isEnhancedEsSearchResponse(response) &&
|
||||
isCompleteResponse(response)
|
||||
) {
|
||||
usage.trackSuccess(response.rawResponse.took);
|
||||
}
|
||||
|
||||
resolve(response);
|
||||
} catch (e) {
|
||||
if (usage) usage.trackError();
|
||||
reject(e);
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
const cancel = async (context: RequestHandlerContext, id: string) => {
|
||||
logger.debug(`cancel ${id}`);
|
||||
await context.core.elasticsearch.client.asCurrentUser.asyncSearch.delete({
|
||||
id,
|
||||
});
|
||||
};
|
||||
|
||||
async function asyncSearch(
|
||||
request: IEnhancedEsSearchRequest,
|
||||
options: ISearchOptions,
|
||||
context: RequestHandlerContext
|
||||
): Promise<IEsSearchResponse> {
|
||||
let promise: TransportRequestPromise<any>;
|
||||
const esClient = context.core.elasticsearch.client.asCurrentUser;
|
||||
const uiSettingsClient = await context.core.uiSettings.client;
|
||||
) {
|
||||
const asyncOptions = getAsyncOptions();
|
||||
const client = context.core.elasticsearch.client.asCurrentUser.asyncSearch;
|
||||
|
||||
// If we have an ID, then just poll for that ID, otherwise send the entire request body
|
||||
if (!request.id) {
|
||||
const submitOptions = toSnakeCase({
|
||||
batchedReduceSize: 64, // Only report partial results every 64 shards; this should be reduced when we actually display partial results
|
||||
...(await getDefaultSearchParams(uiSettingsClient)),
|
||||
...asyncOptions,
|
||||
...request.params,
|
||||
});
|
||||
|
||||
promise = esClient.asyncSearch.submit(submitOptions);
|
||||
} else {
|
||||
promise = esClient.asyncSearch.get({
|
||||
id: request.id,
|
||||
...toSnakeCase(asyncOptions),
|
||||
});
|
||||
}
|
||||
|
||||
const esResponse = await shimAbortSignal(promise, options?.abortSignal);
|
||||
const { id, response, is_partial: isPartial, is_running: isRunning } = esResponse.body;
|
||||
return {
|
||||
id,
|
||||
isPartial,
|
||||
isRunning,
|
||||
rawResponse: shimHitsTotal(response),
|
||||
...getTotalLoaded(response._shards),
|
||||
};
|
||||
return doPartialSearch<ApiResponse<IEsRawAsyncSearchResponse>>(
|
||||
async () =>
|
||||
client.submit(
|
||||
utils.toSnakeCase({
|
||||
...(await getDefaultSearchParams(context.core.uiSettings.client)),
|
||||
batchedReduceSize: 64,
|
||||
...asyncOptions,
|
||||
...request.params,
|
||||
})
|
||||
),
|
||||
(id) =>
|
||||
client.get({
|
||||
id: id!,
|
||||
...utils.toSnakeCase({ ...asyncOptions }),
|
||||
}),
|
||||
(response) => !response.body.is_running,
|
||||
(response) => response.body.id,
|
||||
request.id,
|
||||
options
|
||||
).pipe(
|
||||
utils.toKibanaSearchResponse(),
|
||||
map((response) => ({
|
||||
...response,
|
||||
rawResponse: shimHitsTotal(response.rawResponse.response!),
|
||||
})),
|
||||
utils.trackSearchStatus(logger, usage),
|
||||
utils.includeTotalLoaded()
|
||||
);
|
||||
}
|
||||
|
||||
const rollupSearch = async function (
|
||||
|
@ -126,7 +91,7 @@ export const enhancedEsSearchStrategyProvider = (
|
|||
const { body, index, ...params } = request.params!;
|
||||
const method = 'POST';
|
||||
const path = encodeURI(`/${index}/_rollup_search`);
|
||||
const querystring = toSnakeCase({
|
||||
const querystring = utils.toSnakeCase({
|
||||
...getShardTimeout(config),
|
||||
...(await getDefaultSearchParams(uiSettingsClient)),
|
||||
...params,
|
||||
|
@ -139,14 +104,33 @@ export const enhancedEsSearchStrategyProvider = (
|
|||
querystring,
|
||||
});
|
||||
|
||||
const esResponse = await shimAbortSignal(promise, options?.abortSignal);
|
||||
const esResponse = await utils.shimAbortSignal(promise, options?.abortSignal);
|
||||
|
||||
const response = esResponse.body as SearchResponse<any>;
|
||||
return {
|
||||
rawResponse: response,
|
||||
...getTotalLoaded(response._shards),
|
||||
...utils.getTotalLoaded(response._shards),
|
||||
};
|
||||
};
|
||||
|
||||
return { search, cancel };
|
||||
return {
|
||||
search: (
|
||||
request: IEnhancedEsSearchRequest,
|
||||
options: ISearchOptions,
|
||||
context: RequestHandlerContext
|
||||
) => {
|
||||
logger.debug(`search ${JSON.stringify(request.params) || request.id}`);
|
||||
|
||||
return request.indexType !== 'rollup'
|
||||
? asyncSearch(request, options, context)
|
||||
: from(rollupSearch(request, options, context));
|
||||
},
|
||||
cancel: async (context: RequestHandlerContext, id: string) => {
|
||||
logger.debug(`cancel ${id}`);
|
||||
|
||||
await context.core.elasticsearch.client.asCurrentUser.asyncSearch.delete({
|
||||
id,
|
||||
});
|
||||
},
|
||||
};
|
||||
};
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { IUiSettingsClient } from 'src/core/server';
|
||||
import { UI_SETTINGS } from '../../../../../src/plugins/data/common';
|
||||
|
||||
import { getDefaultSearchParams as getBaseSearchParams } from '../../../../../src/plugins/data/server';
|
||||
|
||||
/**
|
||||
@internal
|
||||
*/
|
||||
export async function getDefaultSearchParams(uiSettingsClient: IUiSettingsClient) {
|
||||
const ignoreThrottled = !(await uiSettingsClient.get(UI_SETTINGS.SEARCH_INCLUDE_FROZEN));
|
||||
|
||||
return {
|
||||
ignoreThrottled,
|
||||
...(await getBaseSearchParams(uiSettingsClient)),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
@internal
|
||||
*/
|
||||
export const getAsyncOptions = (): {
|
||||
waitForCompletionTimeout: string;
|
||||
keepAlive: string;
|
||||
} => ({
|
||||
waitForCompletionTimeout: '100ms', // Wait up to 100ms for the response to return
|
||||
keepAlive: '1m', // Extend the TTL for this search request by one minute,
|
||||
});
|
|
@ -6,7 +6,6 @@
|
|||
import { keyBy, isString } from 'lodash';
|
||||
import { ILegacyScopedClusterClient } from 'src/core/server';
|
||||
import { ReqFacade } from '../../../../../../src/plugins/vis_type_timeseries/server';
|
||||
import { ENHANCED_ES_SEARCH_STRATEGY } from '../../../../data_enhanced/server';
|
||||
import { mergeCapabilitiesWithFields } from '../merge_capabilities_with_fields';
|
||||
import { getCapabilitiesForRollupIndices } from '../map_capabilities';
|
||||
|
||||
|
@ -25,7 +24,7 @@ export const getRollupSearchStrategy = (
|
|||
name = 'rollup';
|
||||
|
||||
constructor() {
|
||||
super(ENHANCED_ES_SEARCH_STRATEGY, 'rollup', { rest_total_hits_as_int: true });
|
||||
super('rollup', { rest_total_hits_as_int: true });
|
||||
}
|
||||
|
||||
async search(req: ReqFacade, bodies: any[], options = {}) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue