async es|ql search strategy (#174246)

This commit is contained in:
Peter Pisljar 2024-01-29 13:13:47 +01:00 committed by GitHub
parent 57b4efa419
commit 9908db47d3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 567 additions and 35 deletions

View file

@ -23,7 +23,12 @@ import { buildEsQuery } from '@kbn/es-query';
import type { ESQLSearchReponse, ESQLSearchParams } from '@kbn/es-types';
import { getEsQueryConfig } from '../../es_query';
import { getTime } from '../../query';
import { ESQL_SEARCH_STRATEGY, IKibanaSearchRequest, ISearchGeneric, KibanaContext } from '..';
import {
ESQL_ASYNC_SEARCH_STRATEGY,
IKibanaSearchRequest,
ISearchGeneric,
KibanaContext,
} from '..';
import { IKibanaSearchResponse } from '../types';
import { UiSettingsCommon } from '../..';
@ -189,7 +194,7 @@ export const getEsqlFn = ({ getStartDependencies }: EsqlFnArguments) => {
return search<
IKibanaSearchRequest<ESQLSearchParams>,
IKibanaSearchResponse<ESQLSearchReponse>
>({ params }, { abortSignal, strategy: ESQL_SEARCH_STRATEGY }).pipe(
>({ params }, { abortSignal, strategy: ESQL_ASYNC_SEARCH_STRATEGY }).pipe(
catchError((error) => {
if (!error.attributes) {
error.message = `Unexpected error from Elasticsearch: ${error.message}`;

View file

@ -7,3 +7,4 @@
*/
export const ESQL_SEARCH_STRATEGY = 'esql';
export const ESQL_ASYNC_SEARCH_STRATEGY = 'esql_async';

View file

@ -81,6 +81,7 @@ import {
eqlRawResponse,
SQL_SEARCH_STRATEGY,
ESQL_SEARCH_STRATEGY,
ESQL_ASYNC_SEARCH_STRATEGY,
} from '../../common/search';
import { getEsaggs, getEsdsl, getEssql, getEql, getEsql } from './expressions';
import {
@ -98,6 +99,7 @@ import { CachedUiSettingsClient } from './services';
import { sqlSearchStrategyProvider } from './strategies/sql_search';
import { searchSessionSavedObjectType } from './saved_objects';
import { esqlSearchStrategyProvider } from './strategies/esql_search';
import { esqlAsyncSearchStrategyProvider } from './strategies/esql_async_search';
type StrategyMap = Record<string, ISearchStrategy<any, any>>;
@ -180,6 +182,10 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
)
);
this.registerSearchStrategy(ESQL_SEARCH_STRATEGY, esqlSearchStrategyProvider(this.logger));
this.registerSearchStrategy(
ESQL_ASYNC_SEARCH_STRATEGY,
esqlAsyncSearchStrategyProvider(this.initializerContext.config.get().search, this.logger)
);
// We don't want to register this because we don't want the client to be able to access this
// strategy, but we do want to expose it to other server-side plugins

View file

@ -0,0 +1,351 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { firstValueFrom } from 'rxjs';
import { KbnServerError } from '@kbn/kibana-utils-plugin/server';
import { KbnSearchError } from '../../report_search_error';
import { errors } from '@elastic/elasticsearch';
import * as indexNotFoundException from '../../../../common/search/test_data/index_not_found_exception.json';
import * as xContentParseException from '../../../../common/search/test_data/x_content_parse_exception.json';
import { SearchStrategyDependencies } from '../../types';
import { esqlAsyncSearchStrategyProvider } from './esql_async_search_strategy';
import { getMockSearchConfig } from '../../../../config.mock';
const mockAsyncResponse = {
body: {
id: 'foo',
response: {
_shards: {
total: 10,
failed: 1,
skipped: 2,
successful: 7,
},
},
},
};
describe('ES|QL async search strategy', () => {
const mockApiCaller = jest.fn();
const mockLogger: any = {
debug: () => {},
};
const mockDeps = {
uiSettingsClient: {
get: jest.fn(),
},
esClient: {
asCurrentUser: {
transport: { request: mockApiCaller },
},
},
} as unknown as SearchStrategyDependencies;
const mockSearchConfig = getMockSearchConfig({});
beforeEach(() => {
mockApiCaller.mockClear();
});
it('returns a strategy with `search and `cancel`', async () => {
const esSearch = await esqlAsyncSearchStrategyProvider(mockSearchConfig, mockLogger);
expect(typeof esSearch.search).toBe('function');
});
describe('search', () => {
describe('no sessionId', () => {
it('makes a POST request with params when no ID provided', async () => {
mockApiCaller.mockResolvedValueOnce(mockAsyncResponse);
const esSearch = await esqlAsyncSearchStrategyProvider(mockSearchConfig, mockLogger);
const params = {
query: 'from logs* | limit 10',
};
await esSearch
.search(
{
id: undefined,
params,
},
{},
mockDeps
)
.toPromise();
expect(mockApiCaller).toBeCalled();
const request = mockApiCaller.mock.calls[0][0].body;
expect(request.query).toEqual(params.query);
expect(request).toHaveProperty('keep_alive', '60000ms');
});
it('makes a GET request to async search with ID', async () => {
mockApiCaller.mockResolvedValueOnce(mockAsyncResponse);
const params = {
query: 'from logs* | limit 10',
};
const esSearch = await esqlAsyncSearchStrategyProvider(mockSearchConfig, mockLogger);
await esSearch.search({ id: 'foo', params }, {}, mockDeps).toPromise();
expect(mockApiCaller).toBeCalled();
const request = mockApiCaller.mock.calls[0][0];
expect(request.path).toContain('foo');
expect(request.querystring).toHaveProperty('wait_for_completion_timeout');
expect(request.querystring).toHaveProperty('keep_alive', '60000ms');
});
it('allows overriding keep_alive and wait_for_completion_timeout', async () => {
mockApiCaller.mockResolvedValueOnce(mockAsyncResponse);
const params = {
query: 'from logs* | limit 10',
wait_for_completion_timeout: '10s',
keep_alive: '5m',
};
const esSearch = await esqlAsyncSearchStrategyProvider(mockSearchConfig, mockLogger);
await esSearch.search({ id: 'foo', params }, {}, mockDeps).toPromise();
expect(mockApiCaller).toBeCalled();
const request = mockApiCaller.mock.calls[0][0];
expect(request.path).toContain('foo');
expect(request.querystring).toHaveProperty('wait_for_completion_timeout', '10s');
expect(request.querystring).toHaveProperty('keep_alive', '5m');
});
it('sets transport options on POST requests', async () => {
const transportOptions = { maxRetries: 1 };
mockApiCaller.mockResolvedValueOnce(mockAsyncResponse);
const params = {};
const esSearch = esqlAsyncSearchStrategyProvider(mockSearchConfig, mockLogger);
await firstValueFrom(
esSearch.search({ params }, { transport: transportOptions }, mockDeps)
);
expect(mockApiCaller).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
method: 'POST',
path: '/_query/async',
body: {
keep_alive: '60000ms',
wait_for_completion_timeout: '100ms',
keep_on_completion: false,
},
}),
expect.objectContaining({ maxRetries: 1, meta: true, signal: undefined })
);
});
it('sets transport options on GET requests', async () => {
mockApiCaller.mockResolvedValueOnce(mockAsyncResponse);
const params = {
query: 'from logs* | limit 10',
};
const esSearch = esqlAsyncSearchStrategyProvider(mockSearchConfig, mockLogger);
await firstValueFrom(
esSearch.search({ id: 'foo', params }, { transport: { maxRetries: 1 } }, mockDeps)
);
expect(mockApiCaller).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
path: '/_query/async/foo',
querystring: {
keep_alive: '60000ms',
wait_for_completion_timeout: '100ms',
},
}),
expect.objectContaining({ maxRetries: 1, meta: true, signal: undefined })
);
});
it('sets wait_for_completion_timeout and keep_alive in the request', async () => {
mockApiCaller.mockResolvedValueOnce(mockAsyncResponse);
const params = {
query: 'from logs* | limit 10',
};
const esSearch = await esqlAsyncSearchStrategyProvider(mockSearchConfig, mockLogger);
await esSearch.search({ params }, {}, mockDeps).toPromise();
expect(mockApiCaller).toBeCalled();
const request = mockApiCaller.mock.calls[0][0].body;
expect(request).toHaveProperty('wait_for_completion_timeout');
expect(request).toHaveProperty('keep_alive');
});
it('should delete when aborted', async () => {
mockApiCaller.mockResolvedValueOnce({
...mockAsyncResponse,
body: {
...mockAsyncResponse.body,
is_running: true,
},
});
const params = {
query: 'from logs* | limit 10',
};
const esSearch = await esqlAsyncSearchStrategyProvider(mockSearchConfig, mockLogger);
const abortController = new AbortController();
const abortSignal = abortController.signal;
// Abort after an incomplete first response is returned
setTimeout(() => abortController.abort(), 100);
let err: KbnServerError | undefined;
try {
await esSearch.search({ params }, { abortSignal }, mockDeps).toPromise();
} catch (e) {
err = e;
}
expect(mockApiCaller).toBeCalled();
expect(err).not.toBeUndefined();
expect(mockApiCaller).toBeCalled();
});
});
it('throws normalized error if ResponseError is thrown', async () => {
const errResponse = new errors.ResponseError({
body: indexNotFoundException,
statusCode: 404,
headers: {},
warnings: [],
meta: {} as any,
});
mockApiCaller.mockRejectedValue(errResponse);
const params = {
query: 'from logs* | limit 10',
};
const esSearch = await esqlAsyncSearchStrategyProvider(mockSearchConfig, mockLogger);
let err: KbnSearchError | undefined;
try {
await esSearch.search({ params }, {}, mockDeps).toPromise();
} catch (e) {
err = e;
}
expect(mockApiCaller).toBeCalled();
expect(err).toBeInstanceOf(KbnSearchError);
expect(err?.statusCode).toBe(404);
expect(err?.message).toBe(errResponse.message);
expect(err?.errBody).toBe(indexNotFoundException);
});
it('throws normalized error if Error is thrown', async () => {
const errResponse = new Error('not good');
mockApiCaller.mockRejectedValue(errResponse);
const params = {
query: 'from logs* | limit 10',
};
const esSearch = await esqlAsyncSearchStrategyProvider(mockSearchConfig, mockLogger);
let err: KbnSearchError | undefined;
try {
await esSearch.search({ params }, {}, mockDeps).toPromise();
} catch (e) {
err = e;
}
expect(mockApiCaller).toBeCalled();
expect(err).toBeInstanceOf(KbnSearchError);
expect(err?.statusCode).toBe(500);
expect(err?.message).toBe(errResponse.message);
expect(err?.errBody).toBe(undefined);
});
});
describe('cancel', () => {
it('makes a DELETE request to async search with the provided ID', async () => {
mockApiCaller.mockResolvedValueOnce(200);
const id = 'some_id';
const esSearch = await esqlAsyncSearchStrategyProvider(mockSearchConfig, mockLogger);
await esSearch.cancel!(id, {}, mockDeps);
expect(mockApiCaller).toBeCalled();
const request = mockApiCaller.mock.calls[0][0];
expect(request.path).toContain(id);
});
it('throws normalized error on ResponseError', async () => {
const errResponse = new errors.ResponseError({
body: xContentParseException,
statusCode: 400,
headers: {},
warnings: [],
meta: {} as any,
});
mockApiCaller.mockRejectedValue(errResponse);
const id = 'some_id';
const esSearch = await esqlAsyncSearchStrategyProvider(mockSearchConfig, mockLogger);
let err: KbnServerError | undefined;
try {
await esSearch.cancel!(id, {}, mockDeps);
} catch (e) {
err = e;
}
expect(mockApiCaller).toBeCalled();
expect(err).toBeInstanceOf(KbnServerError);
expect(err?.statusCode).toBe(400);
expect(err?.message).toBe(errResponse.message);
expect(err?.errBody).toBe(xContentParseException);
});
});
describe('extend', () => {
it('makes a GET request to async search with the provided ID and keepAlive', async () => {
mockApiCaller.mockResolvedValueOnce(mockAsyncResponse);
const id = 'some_other_id';
const keepAlive = '1d';
const esSearch = await esqlAsyncSearchStrategyProvider(mockSearchConfig, mockLogger);
await esSearch.extend!(id, keepAlive, {}, mockDeps);
expect(mockApiCaller).toBeCalled();
const request = mockApiCaller.mock.calls[0][0];
expect(request.body).toEqual({ id, keep_alive: keepAlive });
});
it('throws normalized error on ElasticsearchClientError', async () => {
const errResponse = new errors.ElasticsearchClientError('something is wrong with EsClient');
mockApiCaller.mockRejectedValue(errResponse);
const id = 'some_other_id';
const keepAlive = '1d';
const esSearch = await esqlAsyncSearchStrategyProvider(mockSearchConfig, mockLogger);
let err: KbnServerError | undefined;
try {
await esSearch.extend!(id, keepAlive, {}, mockDeps);
} catch (e) {
err = e;
}
expect(mockApiCaller).toBeCalled();
expect(err).toBeInstanceOf(KbnServerError);
expect(err?.statusCode).toBe(500);
expect(err?.message).toBe(errResponse.message);
expect(err?.errBody).toBe(undefined);
});
});
});

View file

@ -0,0 +1,161 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { IScopedClusterClient, Logger } from '@kbn/core/server';
import { catchError, tap } from 'rxjs/operators';
import { getKbnServerError } from '@kbn/kibana-utils-plugin/server';
import { SqlQueryRequest } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { SqlGetAsyncResponse } from '@elastic/elasticsearch/lib/api/types';
import {
getCommonDefaultAsyncSubmitParams,
getCommonDefaultAsyncGetParams,
} from '../common/async_utils';
import { getKbnSearchError } from '../../report_search_error';
import type { ISearchStrategy, SearchStrategyDependencies } from '../../types';
import type { IAsyncSearchOptions } from '../../../../common';
import { IKibanaSearchRequest, IKibanaSearchResponse, pollSearch } from '../../../../common';
import { toAsyncKibanaSearchResponse } from './response_utils';
import { SearchConfigSchema } from '../../../../config';
export const esqlAsyncSearchStrategyProvider = (
searchConfig: SearchConfigSchema,
logger: Logger,
useInternalUser: boolean = false
): ISearchStrategy<
IKibanaSearchRequest<SqlQueryRequest['body']>,
IKibanaSearchResponse<SqlGetAsyncResponse>
> => {
function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) {
const client = useInternalUser ? esClient.asInternalUser : esClient.asCurrentUser;
return client.transport.request(
{
method: 'DELETE',
path: `/_query/async/${id}`,
},
{
meta: true,
// we don't want the ES client to retry (default value is 3)
maxRetries: 0,
}
);
}
function asyncSearch(
{ id, ...request }: IKibanaSearchRequest<SqlQueryRequest['body']>,
options: IAsyncSearchOptions,
{ esClient, uiSettingsClient }: SearchStrategyDependencies
) {
const client = useInternalUser ? esClient.asInternalUser : esClient.asCurrentUser;
const search = async () => {
const params = id
? {
...getCommonDefaultAsyncGetParams(searchConfig, options),
...(request.params?.keep_alive ? { keep_alive: request.params.keep_alive } : {}),
...(request.params?.wait_for_completion_timeout
? { wait_for_completion_timeout: request.params.wait_for_completion_timeout }
: {}),
}
: {
...(await getCommonDefaultAsyncSubmitParams(searchConfig, options)),
...request.params,
};
const { body, headers, meta } = id
? await client.transport.request<SqlGetAsyncResponse>(
{ method: 'GET', path: `/_query/async/${id}`, querystring: { ...params } },
{ ...options.transport, signal: options.abortSignal, meta: true }
)
: await client.transport.request<SqlGetAsyncResponse>(
{ method: 'POST', path: `/_query/async`, body: params },
{ ...options.transport, signal: options.abortSignal, meta: true }
);
const finalResponse = toAsyncKibanaSearchResponse(
body,
headers?.warning,
// do not return requestParams on polling calls
id ? undefined : meta?.request?.params
);
return finalResponse;
};
const cancel = async () => {
if (!id || options.isStored) return;
try {
await cancelAsyncSearch(id, esClient);
} catch (e) {
// A 404 means either this search request does not exist, or that it is already cancelled
if (e.meta?.statusCode === 404) return;
// Log all other (unexpected) error messages
logger.error(`cancelEsqlAsyncSearch error: ${e.message}`);
}
};
return pollSearch(search, cancel, {
pollInterval: searchConfig.asyncSearch.pollInterval,
...options,
}).pipe(
tap((response) => (id = response.id)),
catchError((e) => {
throw getKbnSearchError(e);
})
);
}
return {
/**
* @param request
* @param options
* @param deps `SearchStrategyDependencies`
* @returns `Observable<IKibanaResponse<SqlGetAsyncResponse>>`
* @throws `KbnSearchError`
*/
search: (request, options: IAsyncSearchOptions, deps) => {
logger.debug(`search ${JSON.stringify(request) || request.id}`);
return asyncSearch(request, options, deps);
},
/**
* @param id async search ID to cancel, as returned from _async_search API
* @param options
* @param deps `SearchStrategyDependencies`
* @returns `Promise<void>`
* @throws `KbnServerError`
*/
cancel: async (id, options, { esClient }) => {
logger.debug(`cancel ${id}`);
try {
await cancelAsyncSearch(id, esClient);
} catch (e) {
throw getKbnServerError(e);
}
},
/**
*
* @param id async search ID to extend, as returned from _async_search API
* @param keepAlive
* @param options
* @param deps `SearchStrategyDependencies`
* @returns `Promise<void>`
* @throws `KbnServerError`
*/
extend: async (id, keepAlive, options, { esClient }) => {
logger.debug(`extend ${id} by ${keepAlive}`);
try {
const client = useInternalUser ? esClient.asInternalUser : esClient.asCurrentUser;
await client.transport.request(
{ method: 'GET', path: `/_query/async/${id}`, body: { id, keep_alive: keepAlive } },
{ ...options.transport, signal: options.abortSignal, meta: true }
);
} catch (e) {
throw getKbnServerError(e);
}
},
};
};

View file

@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export { esqlAsyncSearchStrategyProvider } from './esql_async_search_strategy';

View file

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { ConnectionRequestParams } from '@elastic/transport';
import { SqlGetAsyncResponse } from '@elastic/elasticsearch/lib/api/types';
import { IKibanaSearchResponse } from '../../../../common';
import { sanitizeRequestParams } from '../../sanitize_request_params';
/**
* Get the Kibana representation of an async search response (see `IKibanaSearchResponse`).
*/
export function toAsyncKibanaSearchResponse(
response: SqlGetAsyncResponse,
warning?: string,
requestParams?: ConnectionRequestParams
): IKibanaSearchResponse<SqlGetAsyncResponse> {
return {
id: response.id,
rawResponse: {
...response,
},
isPartial: response.is_partial,
isRunning: response.is_running,
...(warning ? { warning } : {}),
...(requestParams ? { requestParams: sanitizeRequestParams(requestParams) } : {}),
};
}

View file

@ -22,7 +22,6 @@ export default function ({ getService, getPageObjects }: FtrProviderContext) {
]);
const testSubjects = getService('testSubjects');
const browser = getService('browser');
const monacoEditor = getService('monacoEditor');
const filterBar = getService('filterBar');
const queryBar = getService('queryBar');
const elasticChart = getService('elasticChart');
@ -225,37 +224,5 @@ export default function ({ getService, getPageObjects }: FtrProviderContext) {
});
});
});
describe('ES|QL mode', () => {
const type = 'esql';
beforeEach(async () => {
await PageObjects.discover.selectTextBaseLang();
monacoEditor.setCodeEditorValue(
'from logstash-* | where bytes > 1000 | stats countB = count(bytes)'
);
await queryBar.clickQuerySubmitButton();
await waitForLoadingToFinish();
});
getSharedTests({
type,
savedSearch: 'esql test',
query1: 'from logstash-* | where bytes > 1000 | stats countB = count(bytes) ',
query2: 'from logstash-* | where bytes < 2000 | stats countB = count(bytes) ',
savedSearchesRequests: 2,
setQuery: (query) => monacoEditor.setCodeEditorValue(query),
expectedRequests: 1,
});
it(`should send 2 requests (documents + chart) when toggling the chart visibility`, async () => {
await expectSearches(type, 2, async () => {
await PageObjects.discover.toggleChartVisibility();
});
await expectSearches(type, 1, async () => {
await PageObjects.discover.toggleChartVisibility();
});
});
});
});
}