mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
[reporting/csv] use max retry and connection timeout options for search (#139985)
* [reporting/csv] use transport options when pulling data * second attempt * update snapshot * Update src/plugins/data/common/search/types.ts * polish * update further search strategies and add tests * cleanup * use expect().toHaveBeenNthCalledWith()
This commit is contained in:
parent
7423ed0022
commit
f65e8139ce
18 changed files with 251 additions and 90 deletions
|
@ -16,6 +16,9 @@ export const EQL_SEARCH_STRATEGY = 'eql';
|
|||
export type EqlRequestParams = EqlSearchRequest;
|
||||
|
||||
export interface EqlSearchStrategyRequest extends IKibanaSearchRequest<EqlRequestParams> {
|
||||
/**
|
||||
* @deprecated: use IAsyncSearchOptions.transport instead.
|
||||
*/
|
||||
options?: TransportRequestOptions;
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,8 @@
|
|||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import type { TransportRequestOptions } from '@elastic/elasticsearch';
|
||||
import type { KibanaExecutionContext } from '@kbn/core/public';
|
||||
import type { DataView } from '@kbn/data-views-plugin/common';
|
||||
import { Observable } from 'rxjs';
|
||||
|
@ -132,6 +134,12 @@ export interface ISearchOptions {
|
|||
* Index pattern reference is used for better error messages
|
||||
*/
|
||||
indexPattern?: DataView;
|
||||
|
||||
/**
|
||||
* TransportRequestOptions, other than `signal`, to pass through to the ES client.
|
||||
* To pass an abort signal, use {@link ISearchOptions.abortSignal}
|
||||
*/
|
||||
transport?: Omit<TransportRequestOptions, 'signal'>;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -10,6 +10,7 @@ import type { Logger } from '@kbn/core/server';
|
|||
import { eqlSearchStrategyProvider } from './eql_search_strategy';
|
||||
import { SearchStrategyDependencies } from '../../types';
|
||||
import { EqlSearchStrategyRequest } from '../../../../common';
|
||||
import { firstValueFrom } from 'rxjs';
|
||||
|
||||
const getMockEqlResponse = () => ({
|
||||
body: {
|
||||
|
@ -84,9 +85,15 @@ describe('EQL search strategy', () => {
|
|||
await eqlSearch.search({ options, params }, {}, mockDeps).toPromise();
|
||||
const [[request, requestOptions]] = mockEqlSearch.mock.calls;
|
||||
|
||||
expect(request.index).toEqual('logstash-*');
|
||||
expect(request.body).toEqual(expect.objectContaining({ query: 'process where 1 == 1' }));
|
||||
expect(requestOptions).toEqual(expect.objectContaining({ ignore: [400] }));
|
||||
expect(request).toEqual({
|
||||
body: { query: 'process where 1 == 1' },
|
||||
ignore_unavailable: true,
|
||||
index: 'logstash-*',
|
||||
keep_alive: '1m',
|
||||
max_concurrent_shard_requests: undefined,
|
||||
wait_for_completion_timeout: '100ms',
|
||||
});
|
||||
expect(requestOptions).toEqual({ ignore: [400], meta: true, signal: undefined });
|
||||
});
|
||||
|
||||
it('retrieves the current request if an id is provided', async () => {
|
||||
|
@ -95,7 +102,11 @@ describe('EQL search strategy', () => {
|
|||
const [[requestParams]] = mockEqlGet.mock.calls;
|
||||
|
||||
expect(mockEqlSearch).not.toHaveBeenCalled();
|
||||
expect(requestParams).toEqual(expect.objectContaining({ id: 'my-search-id' }));
|
||||
expect(requestParams).toEqual({
|
||||
id: 'my-search-id',
|
||||
keep_alive: '1m',
|
||||
wait_for_completion_timeout: '100ms',
|
||||
});
|
||||
});
|
||||
|
||||
it('emits an error if the client throws', async () => {
|
||||
|
@ -184,7 +195,7 @@ describe('EQL search strategy', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('passes transport options for an existing request', async () => {
|
||||
it('passes (deprecated) transport options for an existing request', async () => {
|
||||
const eqlSearch = await eqlSearchStrategyProvider(mockLogger);
|
||||
await eqlSearch
|
||||
.search({ id: 'my-search-id', options: { ignore: [400] } }, {}, mockDeps)
|
||||
|
@ -194,6 +205,42 @@ describe('EQL search strategy', () => {
|
|||
expect(mockEqlSearch).not.toHaveBeenCalled();
|
||||
expect(requestOptions).toEqual(expect.objectContaining({ ignore: [400] }));
|
||||
});
|
||||
|
||||
it('passes abort signal', async () => {
|
||||
const eqlSearch = eqlSearchStrategyProvider(mockLogger);
|
||||
const eql: EqlSearchStrategyRequest = { id: 'my-search-id' };
|
||||
const abortController = new AbortController();
|
||||
await firstValueFrom(
|
||||
eqlSearch.search(eql, { abortSignal: abortController.signal }, mockDeps)
|
||||
);
|
||||
const [[_params, requestOptions]] = mockEqlGet.mock.calls;
|
||||
|
||||
expect(requestOptions).toEqual({ meta: true, signal: expect.any(AbortSignal) });
|
||||
});
|
||||
|
||||
it('passes transport options for search with id', async () => {
|
||||
const eqlSearch = eqlSearchStrategyProvider(mockLogger);
|
||||
const eql: EqlSearchStrategyRequest = { id: 'my-search-id' };
|
||||
await firstValueFrom(
|
||||
eqlSearch.search(eql, { transport: { maxResponseSize: 13131313 } }, mockDeps)
|
||||
);
|
||||
const [[_params, requestOptions]] = mockEqlGet.mock.calls;
|
||||
|
||||
expect(requestOptions).toEqual({
|
||||
maxResponseSize: 13131313,
|
||||
meta: true,
|
||||
signal: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('passes transport options for search without id', async () => {
|
||||
const eqlSearch = eqlSearchStrategyProvider(mockLogger);
|
||||
const eql: EqlSearchStrategyRequest = { params: { index: 'all' } };
|
||||
await firstValueFrom(eqlSearch.search(eql, { transport: { ignore: [400] } }, mockDeps));
|
||||
const [[_params, requestOptions]] = mockEqlSearch.mock.calls;
|
||||
|
||||
expect(requestOptions).toEqual({ ignore: [400], meta: true, signal: undefined });
|
||||
});
|
||||
});
|
||||
|
||||
describe('response', () => {
|
||||
|
|
|
@ -56,12 +56,18 @@ export const eqlSearchStrategyProvider = (
|
|||
const response = id
|
||||
? await client.get(
|
||||
{ ...params, id },
|
||||
{ ...request.options, signal: options.abortSignal, meta: true }
|
||||
{
|
||||
...request.options,
|
||||
...options.transport,
|
||||
signal: options.abortSignal,
|
||||
meta: true,
|
||||
}
|
||||
)
|
||||
: // @ts-expect-error optional key cannot be used since search doesn't expect undefined
|
||||
await client.search(params as EqlSearchStrategyRequest['params'], {
|
||||
...request.options,
|
||||
abortController: { signal: options.abortSignal },
|
||||
...options.transport,
|
||||
signal: options.abortSignal,
|
||||
meta: true,
|
||||
});
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ import { SearchStrategyDependencies } from '../../types';
|
|||
import * as indexNotFoundException from '../../../../common/search/test_data/index_not_found_exception.json';
|
||||
import { errors } from '@elastic/elasticsearch';
|
||||
import { KbnServerError } from '@kbn/kibana-utils-plugin/server';
|
||||
import { firstValueFrom } from 'rxjs';
|
||||
|
||||
describe('ES search strategy', () => {
|
||||
const successBody = {
|
||||
|
@ -105,6 +106,19 @@ describe('ES search strategy', () => {
|
|||
done();
|
||||
}));
|
||||
|
||||
it('calls the client with transport options', async () => {
|
||||
const params = { index: 'logstash-*', ignore_unavailable: false, timeout: '1000ms' };
|
||||
await firstValueFrom(
|
||||
esSearchStrategyProvider(mockConfig$, mockLogger).search(
|
||||
{ params },
|
||||
{ transport: { maxRetries: 5 } },
|
||||
getMockedDeps()
|
||||
)
|
||||
);
|
||||
const [, searchOptions] = esClient.search.mock.calls[0];
|
||||
expect(searchOptions).toEqual({ signal: undefined, maxRetries: 5 });
|
||||
});
|
||||
|
||||
it('can be aborted', async () => {
|
||||
const params = { index: 'logstash-*', ignore_unavailable: false, timeout: '1000ms' };
|
||||
|
||||
|
@ -120,6 +134,7 @@ describe('ES search strategy', () => {
|
|||
...params,
|
||||
track_total_hits: true,
|
||||
});
|
||||
expect(esClient.search.mock.calls[0][1]).toEqual({ signal: expect.any(AbortSignal) });
|
||||
});
|
||||
|
||||
it('throws normalized error if ResponseError is thrown', async (done) => {
|
||||
|
|
|
@ -28,7 +28,7 @@ export const esSearchStrategyProvider = (
|
|||
* @throws `KbnServerError`
|
||||
* @returns `Observable<IEsSearchResponse<any>>`
|
||||
*/
|
||||
search: (request, { abortSignal, ...options }, { esClient, uiSettingsClient }) => {
|
||||
search: (request, { abortSignal, transport, ...options }, { esClient, uiSettingsClient }) => {
|
||||
// Only default index pattern type is supported here.
|
||||
// See ese for other type support.
|
||||
if (request.indexType) {
|
||||
|
@ -48,6 +48,7 @@ export const esSearchStrategyProvider = (
|
|||
};
|
||||
const body = await esClient.asCurrentUser.search(params, {
|
||||
signal: abortSignal,
|
||||
...transport,
|
||||
});
|
||||
const response = shimHitsTotal(body, options);
|
||||
return toKibanaSearchResponse(response);
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { BehaviorSubject } from 'rxjs';
|
||||
import { BehaviorSubject, firstValueFrom } from 'rxjs';
|
||||
import { KbnServerError } from '@kbn/kibana-utils-plugin/server';
|
||||
import { errors } from '@elastic/elasticsearch';
|
||||
import * as indexNotFoundException from '../../../../common/search/test_data/index_not_found_exception.json';
|
||||
|
@ -119,6 +119,53 @@ describe('ES search strategy', () => {
|
|||
expect(request).toHaveProperty('keep_alive', '1m');
|
||||
});
|
||||
|
||||
it('sets transport options on POST requests', async () => {
|
||||
const transportOptions = { maxRetries: 1 };
|
||||
mockSubmitCaller.mockResolvedValueOnce(mockAsyncResponse);
|
||||
const params = { index: 'logstash-*', body: { query: {} } };
|
||||
const esSearch = enhancedEsSearchStrategyProvider(mockLegacyConfig$, mockLogger);
|
||||
|
||||
await firstValueFrom(
|
||||
esSearch.search({ params }, { transport: transportOptions }, mockDeps)
|
||||
);
|
||||
|
||||
expect(mockSubmitCaller).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
expect.objectContaining({
|
||||
batched_reduce_size: 64,
|
||||
body: { query: {} },
|
||||
ignore_unavailable: true,
|
||||
index: 'logstash-*',
|
||||
keep_alive: '1m',
|
||||
keep_on_completion: false,
|
||||
max_concurrent_shard_requests: undefined,
|
||||
track_total_hits: true,
|
||||
wait_for_completion_timeout: '100ms',
|
||||
}),
|
||||
expect.objectContaining({ maxRetries: 1, meta: true, signal: undefined })
|
||||
);
|
||||
});
|
||||
|
||||
it('sets transport options on GET requests', async () => {
|
||||
mockGetCaller.mockResolvedValueOnce(mockAsyncResponse);
|
||||
const params = { index: 'logstash-*', body: { query: {} } };
|
||||
const esSearch = enhancedEsSearchStrategyProvider(mockLegacyConfig$, mockLogger);
|
||||
|
||||
await firstValueFrom(
|
||||
esSearch.search({ id: 'foo', params }, { transport: { maxRetries: 1 } }, mockDeps)
|
||||
);
|
||||
|
||||
expect(mockGetCaller).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
expect.objectContaining({
|
||||
id: 'foo',
|
||||
keep_alive: '1m',
|
||||
wait_for_completion_timeout: '100ms',
|
||||
}),
|
||||
expect.objectContaining({ maxRetries: 1, meta: true, signal: undefined })
|
||||
);
|
||||
});
|
||||
|
||||
it('sets wait_for_completion_timeout and keep_alive in the request', async () => {
|
||||
mockSubmitCaller.mockResolvedValueOnce(mockAsyncResponse);
|
||||
|
||||
|
|
|
@ -70,9 +70,10 @@ export const enhancedEsSearchStrategyProvider = (
|
|||
const { body, headers } = id
|
||||
? await client.asyncSearch.get(
|
||||
{ ...params, id },
|
||||
{ signal: options.abortSignal, meta: true }
|
||||
{ ...options.transport, signal: options.abortSignal, meta: true }
|
||||
)
|
||||
: await client.asyncSearch.submit(params, {
|
||||
...options.transport,
|
||||
signal: options.abortSignal,
|
||||
meta: true,
|
||||
});
|
||||
|
|
|
@ -72,14 +72,25 @@ describe('SQL search strategy', () => {
|
|||
};
|
||||
const esSearch = await sqlSearchStrategyProvider(mockLogger);
|
||||
|
||||
await esSearch.search({ params }, {}, mockDeps).toPromise();
|
||||
await esSearch
|
||||
.search({ params }, { transport: { requestTimeout: 30000 } }, mockDeps)
|
||||
.toPromise();
|
||||
|
||||
expect(mockSqlQuery).toBeCalled();
|
||||
const request = mockSqlQuery.mock.calls[0][0];
|
||||
expect(request.query).toEqual(params.query);
|
||||
expect(request).toHaveProperty('format', 'json');
|
||||
expect(request).toHaveProperty('keep_alive', '1m');
|
||||
expect(request).toHaveProperty('wait_for_completion_timeout');
|
||||
const [request, searchOptions] = mockSqlQuery.mock.calls[0];
|
||||
expect(request).toEqual({
|
||||
format: 'json',
|
||||
keep_alive: '1m',
|
||||
keep_on_completion: undefined,
|
||||
query:
|
||||
'SELECT customer_first_name FROM kibana_sample_data_ecommerce ORDER BY order_date DESC',
|
||||
wait_for_completion_timeout: '100ms',
|
||||
});
|
||||
expect(searchOptions).toEqual({
|
||||
meta: true,
|
||||
requestTimeout: 30000,
|
||||
signal: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('makes a GET request to async search with ID', async () => {
|
||||
|
@ -92,14 +103,23 @@ describe('SQL search strategy', () => {
|
|||
|
||||
const esSearch = await sqlSearchStrategyProvider(mockLogger);
|
||||
|
||||
await esSearch.search({ id: 'foo', params }, {}, mockDeps).toPromise();
|
||||
await esSearch
|
||||
.search({ id: 'foo', params }, { transport: { requestTimeout: 30000 } }, mockDeps)
|
||||
.toPromise();
|
||||
|
||||
expect(mockSqlGetAsync).toBeCalled();
|
||||
const request = mockSqlGetAsync.mock.calls[0][0];
|
||||
expect(request.id).toEqual('foo');
|
||||
expect(request).toHaveProperty('wait_for_completion_timeout');
|
||||
expect(request).toHaveProperty('keep_alive', '1m');
|
||||
expect(request).toHaveProperty('format', 'json');
|
||||
const [request, searchOptions] = mockSqlGetAsync.mock.calls[0];
|
||||
expect(request).toEqual({
|
||||
format: 'json',
|
||||
id: 'foo',
|
||||
keep_alive: '1m',
|
||||
wait_for_completion_timeout: '100ms',
|
||||
});
|
||||
expect(searchOptions).toEqual({
|
||||
meta: true,
|
||||
requestTimeout: 30000,
|
||||
signal: undefined,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -59,10 +59,7 @@ export const sqlSearchStrategyProvider = (
|
|||
...getDefaultAsyncGetParams(sessionConfig, options),
|
||||
id,
|
||||
},
|
||||
{
|
||||
signal: options.abortSignal,
|
||||
meta: true,
|
||||
}
|
||||
{ ...options.transport, signal: options.abortSignal, meta: true }
|
||||
));
|
||||
} else {
|
||||
({ headers, body } = await client.sql.query(
|
||||
|
@ -71,10 +68,7 @@ export const sqlSearchStrategyProvider = (
|
|||
...getDefaultAsyncSubmitParams(sessionConfig, options),
|
||||
...params,
|
||||
},
|
||||
{
|
||||
signal: options.abortSignal,
|
||||
meta: true,
|
||||
}
|
||||
{ ...options.transport, signal: options.abortSignal, meta: true }
|
||||
));
|
||||
}
|
||||
|
||||
|
|
|
@ -79,7 +79,12 @@ describe('Reporting server createConfig$', () => {
|
|||
"capture": Object {
|
||||
"maxAttempts": 1,
|
||||
},
|
||||
"csv": Object {},
|
||||
"csv": Object {
|
||||
"scroll": Object {
|
||||
"duration": "30s",
|
||||
"size": 500,
|
||||
},
|
||||
},
|
||||
"encryptionKey": "iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
|
||||
"index": ".reporting",
|
||||
"kibanaServer": Object {
|
||||
|
|
|
@ -16,11 +16,11 @@ export const runTaskFnFactory: RunTaskFnFactory<RunTaskFn<TaskPayloadCSV>> = (
|
|||
parentLogger
|
||||
) => {
|
||||
const config = reporting.getConfig();
|
||||
const encryptionKey = config.get('encryptionKey');
|
||||
const csvConfig = config.get('csv');
|
||||
|
||||
return async function runTask(jobId, job, cancellationToken, stream) {
|
||||
const logger = parentLogger.get(`execute-job:${jobId}`);
|
||||
|
||||
const encryptionKey = config.get('encryptionKey');
|
||||
const headers = await decryptJobHeaders(encryptionKey, job.headers, logger);
|
||||
const fakeRequest = reporting.getFakeRequest({ headers }, job.spaceId, logger);
|
||||
const uiSettings = await reporting.getUiSettingsClient(fakeRequest, logger);
|
||||
|
@ -44,7 +44,7 @@ export const runTaskFnFactory: RunTaskFnFactory<RunTaskFn<TaskPayloadCSV>> = (
|
|||
|
||||
const csv = new CsvGenerator(
|
||||
job,
|
||||
config,
|
||||
csvConfig,
|
||||
clients,
|
||||
dependencies,
|
||||
cancellationToken,
|
||||
|
|
|
@ -7,9 +7,7 @@
|
|||
|
||||
import { errors as esErrors } from '@elastic/elasticsearch';
|
||||
import type { SearchResponse } from '@elastic/elasticsearch/lib/api/types';
|
||||
import type { Logger, IScopedClusterClient, IUiSettingsClient } from '@kbn/core/server';
|
||||
import { identity, range } from 'lodash';
|
||||
import * as Rx from 'rxjs';
|
||||
import type { IScopedClusterClient, IUiSettingsClient, Logger } from '@kbn/core/server';
|
||||
import {
|
||||
elasticsearchServiceMock,
|
||||
loggingSystemMock,
|
||||
|
@ -21,14 +19,17 @@ import { searchSourceInstanceMock } from '@kbn/data-plugin/common/search/search_
|
|||
import { IScopedSearchClient } from '@kbn/data-plugin/server';
|
||||
import { dataPluginMock } from '@kbn/data-plugin/server/mocks';
|
||||
import { FieldFormatsRegistry } from '@kbn/field-formats-plugin/common';
|
||||
import { Writable } from 'stream';
|
||||
import { ReportingConfig } from '../../..';
|
||||
import { identity, range } from 'lodash';
|
||||
import * as Rx from 'rxjs';
|
||||
import type { Writable } from 'stream';
|
||||
import type { DeepPartial } from 'utility-types';
|
||||
import { CancellationToken } from '../../../../common/cancellation_token';
|
||||
import {
|
||||
UI_SETTINGS_CSV_QUOTE_VALUES,
|
||||
UI_SETTINGS_CSV_SEPARATOR,
|
||||
UI_SETTINGS_DATEFORMAT_TZ,
|
||||
} from '../../../../common/constants';
|
||||
import { ReportingConfigType } from '../../../config';
|
||||
import { createMockConfig, createMockConfigSchema } from '../../../test_helpers';
|
||||
import { JobParamsCSV } from '../types';
|
||||
import { CsvGenerator } from './generate_csv';
|
||||
|
@ -39,7 +40,7 @@ const createMockJob = (baseObj: any = {}): JobParamsCSV => ({
|
|||
|
||||
let mockEsClient: IScopedClusterClient;
|
||||
let mockDataClient: IScopedSearchClient;
|
||||
let mockConfig: ReportingConfig;
|
||||
let mockConfig: ReportingConfigType['csv'];
|
||||
let mockLogger: jest.Mocked<Logger>;
|
||||
let uiSettingsClient: IUiSettingsClient;
|
||||
let stream: jest.Mocked<Writable>;
|
||||
|
@ -79,6 +80,11 @@ const mockFieldFormatsRegistry = {
|
|||
.mockImplementation(() => ({ id: 'string', convert: jest.fn().mockImplementation(identity) })),
|
||||
} as unknown as FieldFormatsRegistry;
|
||||
|
||||
const getMockConfig = (properties: DeepPartial<ReportingConfigType> = {}) => {
|
||||
const config = createMockConfig(createMockConfigSchema(properties));
|
||||
return config.get('csv');
|
||||
};
|
||||
|
||||
beforeEach(async () => {
|
||||
content = '';
|
||||
stream = { write: jest.fn((chunk) => (content += chunk)) } as unknown as typeof stream;
|
||||
|
@ -100,16 +106,14 @@ beforeEach(async () => {
|
|||
}
|
||||
});
|
||||
|
||||
mockConfig = createMockConfig(
|
||||
createMockConfigSchema({
|
||||
csv: {
|
||||
checkForFormulas: true,
|
||||
escapeFormulaValues: true,
|
||||
maxSizeBytes: 180000,
|
||||
scroll: { size: 500, duration: '30s' },
|
||||
},
|
||||
})
|
||||
);
|
||||
mockConfig = getMockConfig({
|
||||
csv: {
|
||||
checkForFormulas: true,
|
||||
escapeFormulaValues: true,
|
||||
maxSizeBytes: 180000,
|
||||
scroll: { size: 500, duration: '30s' },
|
||||
},
|
||||
});
|
||||
|
||||
searchSourceMock.getField = jest.fn((key: string) => {
|
||||
switch (key) {
|
||||
|
@ -231,17 +235,14 @@ it('calculates the bytes of the content', async () => {
|
|||
|
||||
it('warns if max size was reached', async () => {
|
||||
const TEST_MAX_SIZE = 500;
|
||||
|
||||
mockConfig = createMockConfig(
|
||||
createMockConfigSchema({
|
||||
csv: {
|
||||
checkForFormulas: true,
|
||||
escapeFormulaValues: true,
|
||||
maxSizeBytes: TEST_MAX_SIZE,
|
||||
scroll: { size: 500, duration: '30s' },
|
||||
},
|
||||
})
|
||||
);
|
||||
mockConfig = getMockConfig({
|
||||
csv: {
|
||||
checkForFormulas: true,
|
||||
escapeFormulaValues: true,
|
||||
maxSizeBytes: TEST_MAX_SIZE,
|
||||
scroll: { size: 500, duration: '30s' },
|
||||
},
|
||||
});
|
||||
|
||||
mockDataClient.search = jest.fn().mockImplementation(() =>
|
||||
Rx.of({
|
||||
|
@ -300,6 +301,7 @@ it('uses the scrollId to page all the data', async () => {
|
|||
},
|
||||
})
|
||||
);
|
||||
|
||||
mockEsClient.asCurrentUser.scroll = jest.fn().mockResolvedValue({
|
||||
hits: {
|
||||
hits: range(0, HITS_TOTAL / 10).map(() => ({
|
||||
|
@ -335,7 +337,7 @@ it('uses the scrollId to page all the data', async () => {
|
|||
expect(mockDataClient.search).toHaveBeenCalledTimes(1);
|
||||
expect(mockDataClient.search).toBeCalledWith(
|
||||
{ params: { body: {}, ignore_throttled: undefined, scroll: '30s', size: 500 } },
|
||||
{ strategy: 'es' }
|
||||
{ strategy: 'es', transport: { maxRetries: 0, requestTimeout: '30s' } }
|
||||
);
|
||||
|
||||
// `scroll` and `clearScroll` must be called with scroll ID in the post body!
|
||||
|
@ -729,16 +731,14 @@ describe('formulas', () => {
|
|||
});
|
||||
|
||||
it('can check for formulas, without escaping them', async () => {
|
||||
mockConfig = createMockConfig(
|
||||
createMockConfigSchema({
|
||||
csv: {
|
||||
checkForFormulas: true,
|
||||
escapeFormulaValues: false,
|
||||
maxSizeBytes: 180000,
|
||||
scroll: { size: 500, duration: '30s' },
|
||||
},
|
||||
})
|
||||
);
|
||||
mockConfig = getMockConfig({
|
||||
csv: {
|
||||
checkForFormulas: true,
|
||||
escapeFormulaValues: false,
|
||||
maxSizeBytes: 180000,
|
||||
scroll: { size: 500, duration: '30s' },
|
||||
},
|
||||
});
|
||||
mockDataClient.search = jest.fn().mockImplementation(() =>
|
||||
Rx.of({
|
||||
rawResponse: {
|
||||
|
@ -804,8 +804,15 @@ it('can override ignoring frozen indices', async () => {
|
|||
await generateCsv.generateData();
|
||||
|
||||
expect(mockDataClient.search).toBeCalledWith(
|
||||
{ params: { body: {}, ignore_throttled: false, scroll: '30s', size: 500 } },
|
||||
{ strategy: 'es' }
|
||||
{
|
||||
params: {
|
||||
body: {},
|
||||
ignore_throttled: false,
|
||||
scroll: '30s',
|
||||
size: 500,
|
||||
},
|
||||
},
|
||||
{ strategy: 'es', transport: { maxRetries: 0, requestTimeout: '30s' } }
|
||||
);
|
||||
});
|
||||
|
||||
|
|
|
@ -24,11 +24,11 @@ import type {
|
|||
} from '@kbn/field-formats-plugin/common';
|
||||
import { lastValueFrom } from 'rxjs';
|
||||
import type { Writable } from 'stream';
|
||||
import type { ReportingConfig } from '../../..';
|
||||
import type { CancellationToken } from '../../../../common/cancellation_token';
|
||||
import { CONTENT_TYPE_CSV } from '../../../../common/constants';
|
||||
import { AuthenticationExpiredError, ReportingError } from '../../../../common/errors';
|
||||
import { byteSizeValueToNumber } from '../../../../common/schema_utils';
|
||||
import { ReportingConfigType } from '../../../config';
|
||||
import type { TaskRunResult } from '../../../lib/tasks';
|
||||
import type { JobParamsCSV } from '../types';
|
||||
import { CsvExportSettings, getExportSettings } from './get_export_settings';
|
||||
|
@ -53,7 +53,7 @@ export class CsvGenerator {
|
|||
|
||||
constructor(
|
||||
private job: Omit<JobParamsCSV, 'version'>,
|
||||
private config: ReportingConfig,
|
||||
private config: ReportingConfigType['csv'],
|
||||
private clients: Clients,
|
||||
private dependencies: Dependencies,
|
||||
private cancellationToken: CancellationToken,
|
||||
|
@ -84,7 +84,13 @@ export class CsvGenerator {
|
|||
try {
|
||||
results = (
|
||||
await lastValueFrom(
|
||||
this.clients.data.search(searchParams, { strategy: ES_SEARCH_STRATEGY })
|
||||
this.clients.data.search(searchParams, {
|
||||
strategy: ES_SEARCH_STRATEGY,
|
||||
transport: {
|
||||
maxRetries: 0, // retrying reporting jobs is handled in the task manager scheduling logic
|
||||
requestTimeout: this.config.scroll.duration,
|
||||
},
|
||||
})
|
||||
)
|
||||
).rawResponse as estypes.SearchResponse<unknown>;
|
||||
} catch (err) {
|
||||
|
|
|
@ -22,7 +22,7 @@ import { getExportSettings } from './get_export_settings';
|
|||
|
||||
describe('getExportSettings', () => {
|
||||
let uiSettingsClient: IUiSettingsClient;
|
||||
const config = createMockConfig(createMockConfigSchema({}));
|
||||
const config = createMockConfig(createMockConfigSchema({})).get('csv');
|
||||
const logger = loggingSystemMock.createLogger();
|
||||
|
||||
beforeEach(() => {
|
||||
|
@ -55,8 +55,8 @@ describe('getExportSettings', () => {
|
|||
"includeFrozen": false,
|
||||
"maxSizeBytes": undefined,
|
||||
"scroll": Object {
|
||||
"duration": undefined,
|
||||
"size": undefined,
|
||||
"duration": "30s",
|
||||
"size": 500,
|
||||
},
|
||||
"separator": ",",
|
||||
"timezone": "UTC",
|
||||
|
|
|
@ -8,7 +8,6 @@
|
|||
import { ByteSizeValue } from '@kbn/config-schema';
|
||||
import type { IUiSettingsClient, Logger } from '@kbn/core/server';
|
||||
import { createEscapeValue } from '@kbn/data-plugin/common';
|
||||
import { ReportingConfig } from '../../..';
|
||||
import {
|
||||
CSV_BOM_CHARS,
|
||||
UI_SETTINGS_CSV_QUOTE_VALUES,
|
||||
|
@ -16,6 +15,7 @@ import {
|
|||
UI_SETTINGS_DATEFORMAT_TZ,
|
||||
UI_SETTINGS_SEARCH_INCLUDE_FROZEN,
|
||||
} from '../../../../common/constants';
|
||||
import { ReportingConfigType } from '../../../config';
|
||||
|
||||
export interface CsvExportSettings {
|
||||
timezone: string;
|
||||
|
@ -34,7 +34,7 @@ export interface CsvExportSettings {
|
|||
|
||||
export const getExportSettings = async (
|
||||
client: IUiSettingsClient,
|
||||
config: ReportingConfig,
|
||||
config: ReportingConfigType['csv'],
|
||||
timezone: string | undefined,
|
||||
logger: Logger
|
||||
): Promise<CsvExportSettings> => {
|
||||
|
@ -60,21 +60,21 @@ export const getExportSettings = async (
|
|||
client.get(UI_SETTINGS_CSV_QUOTE_VALUES),
|
||||
]);
|
||||
|
||||
const escapeFormulaValues = config.get('csv', 'escapeFormulaValues');
|
||||
const escapeFormulaValues = config.escapeFormulaValues;
|
||||
const escapeValue = createEscapeValue(quoteValues, escapeFormulaValues);
|
||||
const bom = config.get('csv', 'useByteOrderMarkEncoding') ? CSV_BOM_CHARS : '';
|
||||
const bom = config.useByteOrderMarkEncoding ? CSV_BOM_CHARS : '';
|
||||
|
||||
return {
|
||||
timezone: setTimezone,
|
||||
scroll: {
|
||||
size: config.get('csv', 'scroll', 'size'),
|
||||
duration: config.get('csv', 'scroll', 'duration'),
|
||||
size: config.scroll.size,
|
||||
duration: config.scroll.duration,
|
||||
},
|
||||
bom,
|
||||
includeFrozen,
|
||||
separator,
|
||||
maxSizeBytes: config.get('csv', 'maxSizeBytes'),
|
||||
checkForFormulas: config.get('csv', 'checkForFormulas'),
|
||||
maxSizeBytes: config.maxSizeBytes,
|
||||
checkForFormulas: config.checkForFormulas,
|
||||
escapeFormulaValues,
|
||||
escapeValue,
|
||||
};
|
||||
|
|
|
@ -30,7 +30,7 @@ export const runTaskFnFactory: RunTaskFnFactory<ImmediateExecuteFn> = function e
|
|||
reporting,
|
||||
parentLogger
|
||||
) {
|
||||
const config = reporting.getConfig();
|
||||
const config = reporting.getConfig().get('csv');
|
||||
const logger = parentLogger.get('execute-job');
|
||||
|
||||
return async function runTask(_jobId, immediateJobParams, context, stream, req) {
|
||||
|
@ -39,9 +39,9 @@ export const runTaskFnFactory: RunTaskFnFactory<ImmediateExecuteFn> = function e
|
|||
...immediateJobParams,
|
||||
};
|
||||
|
||||
const dataPluginStart = await reporting.getDataService();
|
||||
const savedObjectsClient = (await context.core).savedObjects.client;
|
||||
const uiSettings = await reporting.getUiSettingsServiceFactory(savedObjectsClient);
|
||||
const dataPluginStart = await reporting.getDataService();
|
||||
const fieldFormatsRegistry = await getFieldFormats().fieldFormatServiceFactory(uiSettings);
|
||||
|
||||
const [es, searchSourceStart] = await Promise.all([
|
||||
|
|
|
@ -113,6 +113,7 @@ export const createMockConfigSchema = (
|
|||
...overrides.queue,
|
||||
},
|
||||
csv: {
|
||||
scroll: { size: 500, duration: '30s' },
|
||||
...overrides.csv,
|
||||
},
|
||||
roles: {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue