[Reporting] Support 'auto' value for csv scroll duration config (#175005)

## Summary

Closes https://github.com/elastic/kibana/issues/174988


### Checklist

Delete any items that are not applicable to this PR.
<!--
- [ ] Any text added follows [EUI's writing
guidelines](https://elastic.github.io/eui/#/guidelines/writing), uses
sentence case text and includes [i18n
support](https://github.com/elastic/kibana/blob/main/packages/kbn-i18n/README.md)
- [ ]
[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)
was added for features that require explanation or tutorials -->
- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
<!--
- [ ] [Flaky Test
Runner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was
used on any tests changed
- [ ] Any UI touched in this PR is usable by keyboard only (learn more
about [keyboard accessibility](https://webaim.org/techniques/keyboard/))
- [ ] Any UI touched in this PR does not create any new axe failures
(run axe in browser:
[FF](https://addons.mozilla.org/en-US/firefox/addon/axe-devtools/),
[Chrome](https://chrome.google.com/webstore/detail/axe-web-accessibility-tes/lhdoppojpmngadmnindnejefpokejbdd?hl=en-US))
- [ ] If a plugin configuration key changed, check if it needs to be
allowlisted in the cloud and added to the [docker
list](https://github.com/elastic/kibana/blob/main/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker)
- [ ] This renders correctly on smaller devices using a responsive
layout. (You can test this [in your
browser](https://www.browserstack.com/guide/responsive-testing-on-local-server))
- [ ] This was checked for [cross-browser
compatibility](https://www.elastic.co/support/matrix#matrix_browsers)


### Risk Matrix

Delete this section if it is not applicable to this PR.

Before closing this PR, invite QA, stakeholders, and other developers to
identify risks that should be tested prior to the change/feature
release.

When forming the risk matrix, consider some of the following examples
and how they may potentially impact the change:

| Risk | Probability | Severity | Mitigation/Notes |

|---------------------------|-------------|----------|-------------------------|
| Multiple Spaces&mdash;unexpected behavior in non-default Kibana Space.
| Low | High | Integration tests will verify that all features are still
supported in non-default Kibana Space and when user switches between
spaces. |
| Multiple nodes&mdash;Elasticsearch polling might have race conditions
when multiple Kibana nodes are polling for the same tasks. | High | Low
| Tasks are idempotent, so executing them multiple times will not result
in logical error, but will degrade performance. To test for this case we
add plenty of unit tests around this logic and document manual testing
procedure. |
| Code should gracefully handle cases when feature X or plugin Y are
disabled. | Medium | High | Unit tests will verify that any feature flag
or plugin combination still results in our service operational. |
| [See more potential risk
examples](https://github.com/elastic/kibana/blob/main/RISK_MATRIX.mdx) |


### For maintainers

- [ ] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)
-->
This commit is contained in:
Eyo O. Eyo 2024-02-06 15:14:45 +01:00 committed by GitHub
parent 9ccd7cc571
commit 92b6fd64cd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 702 additions and 78 deletions

View file

@ -232,10 +232,10 @@ You may need to lower this setting if the default number of documents creates a
============
`xpack.reporting.csv.scroll.duration`::
Amount of {time-units}[time] allowed before {kib} cleans the scroll context during a CSV export. Defaults to `30s`.
Amount of {time-units}[time] allowed before {kib} cleans the scroll context during a CSV export. Valid option is either `auto` or {time-units}[time], Defaults to `30s`.
[NOTE]
============
If search latency in {es} is sufficiently high, such as if you are using {ccs}, you may need to increase the setting.
If search latency in {es} is sufficiently high, such as if you are using {ccs}, you may either need to increase the time setting or set this config value to `auto`. When the config value is set to `auto` the scroll context will be preserved for as long as is possible, before the report task is terminated due to the limits of `xpack.reporting.queue.timeout`.
============
`xpack.reporting.csv.scroll.strategy`::

View file

@ -226,6 +226,14 @@ exports[`CsvGenerator Scroll strategy uses the scroll context to page all the da
"
`;
exports[`CsvGenerator export behavior when scroll duration config is auto csv gets generated if search resolves without errors before the computed timeout value passed to the search data client elapses 1`] = `
"a,b
a1,b1
a1,b1
a1,b1
"
`;
exports[`CsvGenerator fields from job.columns (7.13+ generated) cells can be multi-value 1`] = `
"product,category
coconut,\\"cool, rad\\"

View file

@ -9,6 +9,7 @@
import { identity, range } from 'lodash';
import * as Rx from 'rxjs';
import type { Writable } from 'stream';
import { add, type Duration } from 'date-fns';
import { errors as esErrors, estypes } from '@elastic/elasticsearch';
import type { SearchResponse } from '@elastic/elasticsearch/lib/api/types';
@ -21,6 +22,8 @@ import {
} from '@kbn/core/server/mocks';
import { ISearchClient, ISearchStartSearchSource } from '@kbn/data-plugin/common';
import { searchSourceInstanceMock } from '@kbn/data-plugin/common/search/search_source/mocks';
import type { IScopedSearchClient } from '@kbn/data-plugin/server';
import type { IKibanaSearchResponse } from '@kbn/data-plugin/common';
import { dataPluginMock } from '@kbn/data-plugin/server/mocks';
import { FieldFormatsRegistry } from '@kbn/field-formats-plugin/common';
import { CancellationToken } from '@kbn/reporting-common';
@ -360,7 +363,11 @@ describe('CsvGenerator', () => {
expect(mockDataClient.search).toHaveBeenCalledTimes(10);
expect(mockDataClient.search).toBeCalledWith(
{ params: { body: {}, ignore_throttled: undefined, max_concurrent_shard_requests: 5 } },
{ strategy: 'es', transport: { maxRetries: 0, requestTimeout: '30s' } }
{
abortSignal: expect.any(AbortSignal),
strategy: 'es',
transport: { maxRetries: 0, requestTimeout: '30s' },
}
);
expect(mockEsClient.asCurrentUser.openPointInTime).toHaveBeenCalledTimes(1);
@ -370,7 +377,12 @@ describe('CsvGenerator', () => {
index: 'logstash-*',
keep_alive: '30s',
},
{ maxConcurrentShardRequests: 5, maxRetries: 0, requestTimeout: '30s' }
{
maxConcurrentShardRequests: 5,
maxRetries: 0,
requestTimeout: '30s',
signal: expect.any(AbortSignal),
}
);
expect(mockEsClient.asCurrentUser.closePointInTime).toHaveBeenCalledTimes(1);
@ -548,6 +560,203 @@ describe('CsvGenerator', () => {
});
});
describe('export behavior when scroll duration config is auto', () => {
const getTaskInstanceFields = (intervalFromNow: Duration) => {
const now = new Date(Date.now());
return { startedAt: now, retryAt: add(now, intervalFromNow) };
};
let mockConfigWithAutoScrollDuration: ReportingConfigType['csv'];
let mockDataClientSearchFn: jest.MockedFunction<IScopedSearchClient['search']>;
beforeEach(() => {
mockConfigWithAutoScrollDuration = {
...mockConfig,
scroll: {
...mockConfig.scroll,
duration: 'auto',
},
};
mockDataClientSearchFn = jest.fn();
jest.useFakeTimers();
});
afterEach(() => {
jest.clearAllTimers();
jest.useRealTimers();
mockDataClientSearchFn.mockRestore();
});
it('csv gets generated if search resolves without errors before the computed timeout value passed to the search data client elapses', async () => {
const timeFromNowInMs = 4 * 60 * 1000;
const taskInstanceFields = getTaskInstanceFields({
seconds: timeFromNowInMs / 1000,
});
mockDataClientSearchFn.mockImplementation((_, options) => {
const getSearchResult = () => {
const queuedAt = Date.now();
return new Promise<IKibanaSearchResponse<ReturnType<typeof getMockRawResponse>>>(
(resolve, reject) => {
setTimeout(() => {
if (
new Date(Date.now()).getTime() - new Date(queuedAt).getTime() >
Number((options?.transport?.requestTimeout! as string).replace(/ms/, ''))
) {
reject(
new esErrors.ResponseError({ statusCode: 408, meta: {} as any, warnings: [] })
);
} else {
resolve({
rawResponse: getMockRawResponse(
[
{
fields: { a: ['a1'], b: ['b1'] },
} as unknown as estypes.SearchHit,
],
3
),
});
}
}, timeFromNowInMs / 4);
}
);
};
return Rx.defer(getSearchResult);
});
const generateCsvPromise = new CsvGenerator(
createMockJob({ searchSource: {}, columns: ['a', 'b'] }),
mockConfigWithAutoScrollDuration,
taskInstanceFields,
{
es: mockEsClient,
data: {
...mockDataClient,
search: mockDataClientSearchFn,
},
uiSettings: uiSettingsClient,
},
{
searchSourceStart: mockSearchSourceService,
fieldFormatsRegistry: mockFieldFormatsRegistry,
},
new CancellationToken(),
mockLogger,
stream
).generateData();
await jest.advanceTimersByTimeAsync(timeFromNowInMs);
expect(await generateCsvPromise).toEqual(
expect.objectContaining({
warnings: [],
})
);
expect(mockDataClientSearchFn).toBeCalledWith(
{ params: { body: {}, ignore_throttled: undefined, max_concurrent_shard_requests: 5 } },
{
abortSignal: expect.any(AbortSignal),
strategy: 'es',
transport: { maxRetries: 0, requestTimeout: `${timeFromNowInMs}ms` },
}
);
expect(content).toMatchSnapshot();
});
it('csv generation errors if search request does not resolve before the computed timeout value passed to the search data client elapses', async () => {
const timeFromNowInMs = 4 * 60 * 1000;
const taskInstanceFields = getTaskInstanceFields({
seconds: timeFromNowInMs / 1000,
});
const requestDuration = timeFromNowInMs + 1000;
mockDataClientSearchFn.mockImplementation((_, options) => {
const getSearchResult = () => {
const queuedAt = Date.now();
return new Promise<IKibanaSearchResponse<ReturnType<typeof getMockRawResponse>>>(
(resolve, reject) => {
setTimeout(() => {
if (
new Date(Date.now()).getTime() - new Date(queuedAt).getTime() >
Number((options?.transport?.requestTimeout! as string).replace(/ms/, ''))
) {
reject(
new esErrors.ResponseError({ statusCode: 408, meta: {} as any, warnings: [] })
);
} else {
resolve({
rawResponse: getMockRawResponse(
[
{
fields: { a: ['a1'], b: ['b1'] },
} as unknown as estypes.SearchHit,
],
3
),
});
}
}, requestDuration);
}
);
};
return Rx.defer(getSearchResult);
});
const generateCsvPromise = new CsvGenerator(
createMockJob({ searchSource: {}, columns: ['a', 'b'] }),
mockConfigWithAutoScrollDuration,
taskInstanceFields,
{
es: mockEsClient,
data: {
...mockDataClient,
search: mockDataClientSearchFn,
},
uiSettings: uiSettingsClient,
},
{
searchSourceStart: mockSearchSourceService,
fieldFormatsRegistry: mockFieldFormatsRegistry,
},
new CancellationToken(),
mockLogger,
stream
).generateData();
await jest.advanceTimersByTimeAsync(requestDuration);
expect(await generateCsvPromise).toEqual(
expect.objectContaining({
warnings: expect.arrayContaining([
expect.stringContaining('Received a 408 response from Elasticsearch'),
]),
})
);
expect(mockDataClientSearchFn).toBeCalledWith(
{ params: { body: {}, ignore_throttled: undefined, max_concurrent_shard_requests: 5 } },
{
abortSignal: expect.any(AbortSignal),
strategy: 'es',
transport: { maxRetries: 0, requestTimeout: `${timeFromNowInMs}ms` },
}
);
});
});
describe('Scroll strategy', () => {
const mockJobUsingScrollPaging = createMockJob({
columns: ['date', 'ip', 'message'],
@ -654,7 +863,11 @@ describe('CsvGenerator', () => {
max_concurrent_shard_requests: 5,
}),
},
{ strategy: 'es', transport: { maxRetries: 0, requestTimeout: '30s' } }
{
abortSignal: expect.any(AbortSignal),
strategy: 'es',
transport: { maxRetries: 0, requestTimeout: '30s' },
}
);
expect(mockEsClient.asCurrentUser.openPointInTime).not.toHaveBeenCalled();
@ -1200,17 +1413,12 @@ describe('CsvGenerator', () => {
index: 'logstash-*',
keep_alive: '30s',
},
{ maxConcurrentShardRequests: 5, maxRetries: 0, requestTimeout: '30s' }
);
expect(mockEsClient.asCurrentUser.openPointInTime).toHaveBeenCalledWith(
{
ignore_unavailable: true,
ignore_throttled: false,
index: 'logstash-*',
keep_alive: '30s',
},
{ maxConcurrentShardRequests: 5, maxRetries: 0, requestTimeout: '30s' }
maxConcurrentShardRequests: 5,
maxRetries: 0,
requestTimeout: '30s',
signal: expect.any(AbortSignal),
}
);
expect(mockDataClient.search).toBeCalledWith(
@ -1220,7 +1428,11 @@ describe('CsvGenerator', () => {
max_concurrent_shard_requests: 5,
},
},
{ strategy: 'es', transport: { maxRetries: 0, requestTimeout: '30s' } }
{
abortSignal: expect.any(AbortSignal),
strategy: 'es',
transport: { maxRetries: 0, requestTimeout: '30s' },
}
);
});

View file

@ -21,9 +21,9 @@ import type {
} from '@kbn/field-formats-plugin/common';
import {
AuthenticationExpiredError,
byteSizeValueToNumber,
CancellationToken,
ReportingError,
byteSizeValueToNumber,
} from '@kbn/reporting-common';
import type { TaskInstanceFields, TaskRunResult } from '@kbn/reporting-common/types';
import type { ReportingConfigType } from '@kbn/reporting-server';
@ -63,7 +63,6 @@ export class CsvGenerator {
private logger: Logger,
private stream: Writable
) {}
/*
* Load field formats for each field in the list
*/
@ -180,9 +179,9 @@ export class CsvGenerator {
/*
* Intrinsically, generating the rows is a synchronous process. Awaiting
* on a setImmediate call here partititions what could be a very long and
* CPU-intenstive synchronous process into asychronous processes. This
* give NodeJS to process other asychronous events that wait on the Event
* on a setImmediate call here partitions what could be a very long and
* CPU-intensive synchronous process into asynchronous processes. This
* give NodeJS to process other asynchronous events that wait on the Event
* Loop.
*
* See: https://nodejs.org/en/docs/guides/dont-block-the-event-loop/
@ -225,7 +224,13 @@ export class CsvGenerator {
public async generateData(): Promise<TaskRunResult> {
const logger = this.logger;
const [settings, searchSource] = await Promise.all([
getExportSettings(this.clients.uiSettings, this.config, this.job.browserTimezone, logger),
getExportSettings(
this.clients.uiSettings,
this.taskInstanceFields,
this.config,
this.job.browserTimezone,
logger
),
this.dependencies.searchSourceStart.create(this.job.searchSource),
]);
@ -252,15 +257,30 @@ export class CsvGenerator {
let totalRecords: number | undefined;
let reportingError: undefined | ReportingError;
const abortController = new AbortController();
this.cancellationToken.on(() => abortController.abort());
// use a class to internalize the paging strategy
let cursor: SearchCursor;
if (this.job.pagingStrategy === 'scroll') {
// Optional strategy: scan-and-scroll
cursor = new SearchCursorScroll(indexPatternTitle, settings, this.clients, this.logger);
cursor = new SearchCursorScroll(
indexPatternTitle,
settings,
this.clients,
abortController,
this.logger
);
logger.debug('Using search strategy: scroll');
} else {
// Default strategy: point-in-time
cursor = new SearchCursorPit(indexPatternTitle, settings, this.clients, this.logger);
cursor = new SearchCursorPit(
indexPatternTitle,
settings,
this.clients,
abortController,
this.logger
);
logger.debug('Using search strategy: pit');
}
await cursor.initialize();
@ -289,6 +309,7 @@ export class CsvGenerator {
if (this.cancellationToken.isCancelled()) {
break;
}
searchSource.setField('size', settings.scroll.size);
let results: estypes.SearchResponse<unknown> | undefined;
@ -406,7 +427,7 @@ export class CsvGenerator {
/*
* Add the errors into the CSV content. This makes error messages more
* discoverable. When the export was automated or triggered by an API
* call or is automated, the user doesn't necesssarily go through the
* call or is automated, the user doesn't necessarily go through the
* Kibana UI to download the export and might not otherwise see the
* error message.
*/

View file

@ -8,6 +8,7 @@
import * as Rx from 'rxjs';
import type { Writable } from 'stream';
import { add, type Duration } from 'date-fns';
import { errors as esErrors } from '@elastic/elasticsearch';
import type { IScopedClusterClient, IUiSettingsClient, Logger } from '@kbn/core/server';
@ -20,9 +21,9 @@ import {
import { IKibanaSearchResponse } from '@kbn/data-plugin/common';
import { IScopedSearchClient } from '@kbn/data-plugin/server';
import { dataPluginMock } from '@kbn/data-plugin/server/mocks';
import type { ESQLSearchReponse } from '@kbn/es-types';
import { CancellationToken } from '@kbn/reporting-common';
import type { ReportingConfigType } from '@kbn/reporting-server';
import type { ESQLSearchReponse as ESQLSearchResponse } from '@kbn/es-types';
import {
UI_SETTINGS_CSV_QUOTE_VALUES,
UI_SETTINGS_CSV_SEPARATOR,
@ -37,6 +38,8 @@ const createMockJob = (
query: { esql: '' },
});
const mockTaskInstanceFields = { startedAt: null, retryAt: null };
describe('CsvESQLGenerator', () => {
let mockEsClient: IScopedClusterClient;
let mockDataClient: IScopedSearchClient;
@ -47,20 +50,20 @@ describe('CsvESQLGenerator', () => {
let content: string;
const getMockRawResponse = (
esqlResponse: ESQLSearchReponse = {
esqlResponse: ESQLSearchResponse = {
columns: [],
values: [],
}
): ESQLSearchReponse => esqlResponse;
): ESQLSearchResponse => esqlResponse;
const mockDataClientSearchDefault = jest.fn().mockImplementation(
(): Rx.Observable<IKibanaSearchResponse<ESQLSearchReponse>> =>
(): Rx.Observable<IKibanaSearchResponse<ESQLSearchResponse>> =>
Rx.of({
rawResponse: getMockRawResponse(),
})
);
const mockSearchResponse = (response: ESQLSearchReponse) => {
const mockSearchResponse = (response: ESQLSearchResponse) => {
mockDataClient.search = jest.fn().mockImplementation(() =>
Rx.of({
rawResponse: getMockRawResponse(response),
@ -105,6 +108,7 @@ describe('CsvESQLGenerator', () => {
const generateCsv = new CsvESQLGenerator(
createMockJob({ columns: ['date', 'ip', 'message'] }),
mockConfig,
mockTaskInstanceFields,
{
es: mockEsClient,
data: mockDataClient,
@ -136,6 +140,7 @@ describe('CsvESQLGenerator', () => {
const generateCsv = new CsvESQLGenerator(
createMockJob(),
mockConfig,
mockTaskInstanceFields,
{
es: mockEsClient,
data: mockDataClient,
@ -163,6 +168,7 @@ describe('CsvESQLGenerator', () => {
const generateCsv = new CsvESQLGenerator(
createMockJob(),
mockConfig,
mockTaskInstanceFields,
{
es: mockEsClient,
data: mockDataClient,
@ -192,6 +198,7 @@ describe('CsvESQLGenerator', () => {
const generateCsv = new CsvESQLGenerator(
createMockJob(),
mockConfig,
mockTaskInstanceFields,
{
es: mockEsClient,
data: mockDataClient,
@ -211,6 +218,189 @@ describe('CsvESQLGenerator', () => {
`);
});
describe('"auto" scroll duration config', () => {
const getTaskInstanceFields = (intervalFromNow: Duration) => {
const now = new Date(Date.now());
return { startedAt: now, retryAt: add(now, intervalFromNow) };
};
let mockConfigWithAutoScrollDuration: ReportingConfigType['csv'];
let mockDataClientSearchFn: jest.MockedFunction<IScopedSearchClient['search']>;
beforeEach(() => {
mockConfigWithAutoScrollDuration = {
...mockConfig,
scroll: {
...mockConfig.scroll,
duration: 'auto',
},
};
mockDataClientSearchFn = jest.fn();
jest.useFakeTimers();
});
afterEach(() => {
jest.clearAllTimers();
jest.useRealTimers();
mockDataClientSearchFn.mockRestore();
});
it('csv gets generated if search resolves without errors before the computed timeout value passed to the search data client elapses', async () => {
const timeFromNowInMs = 4 * 60 * 1000;
const taskInstanceFields = getTaskInstanceFields({
seconds: timeFromNowInMs / 1000,
});
mockDataClientSearchFn.mockImplementation((_, options) => {
const getSearchResult = () => {
const queuedAt = Date.now();
return new Promise<IKibanaSearchResponse<ReturnType<typeof getMockRawResponse>>>(
(resolve, reject) => {
setTimeout(() => {
if (
new Date(Date.now()).getTime() - new Date(queuedAt).getTime() >
Number((options?.transport?.requestTimeout! as string).replace(/ms/, ''))
) {
reject(
new esErrors.ResponseError({ statusCode: 408, meta: {} as any, warnings: [] })
);
} else {
resolve({
rawResponse: getMockRawResponse({
columns: [{ name: 'message', type: 'string' }],
values: Array(100).fill(['This is a great message!']),
}),
});
}
}, timeFromNowInMs / 4);
}
);
};
return Rx.defer(getSearchResult);
});
const generateCsvPromise = new CsvESQLGenerator(
createMockJob(),
mockConfigWithAutoScrollDuration,
taskInstanceFields,
{
es: mockEsClient,
data: {
...mockDataClient,
search: mockDataClientSearchFn,
},
uiSettings: uiSettingsClient,
},
new CancellationToken(),
mockLogger,
stream
).generateData();
await jest.advanceTimersByTimeAsync(timeFromNowInMs);
expect(await generateCsvPromise).toEqual(
expect.objectContaining({
warnings: [],
})
);
expect(mockDataClientSearchFn).toBeCalledWith(
{ params: { filter: undefined, locale: 'en', query: '' } },
{
strategy: 'esql',
transport: {
requestTimeout: `${timeFromNowInMs}ms`,
},
abortSignal: expect.any(AbortSignal),
}
);
});
it('csv generation errors if search request does not resolve before the computed timeout value passed to the search data client elapses', async () => {
const timeFromNowInMs = 4 * 60 * 1000;
const taskInstanceFields = getTaskInstanceFields({
seconds: timeFromNowInMs / 1000,
});
const requestDuration = timeFromNowInMs + 1000;
mockDataClientSearchFn.mockImplementation((_, options) => {
const getSearchResult = () => {
const queuedAt = Date.now();
return new Promise<IKibanaSearchResponse<ReturnType<typeof getMockRawResponse>>>(
(resolve, reject) => {
setTimeout(() => {
if (
new Date(Date.now()).getTime() - new Date(queuedAt).getTime() >
Number((options?.transport?.requestTimeout! as string).replace(/ms/, ''))
) {
reject(
new esErrors.ResponseError({ statusCode: 408, meta: {} as any, warnings: [] })
);
} else {
resolve({
rawResponse: getMockRawResponse({
columns: [{ name: 'message', type: 'string' }],
values: Array(100).fill(['This is a great message!']),
}),
});
}
}, requestDuration);
}
);
};
return Rx.defer(getSearchResult);
});
const generateCsvPromise = new CsvESQLGenerator(
createMockJob(),
mockConfigWithAutoScrollDuration,
taskInstanceFields,
{
es: mockEsClient,
data: {
...mockDataClient,
search: mockDataClientSearchFn,
},
uiSettings: uiSettingsClient,
},
new CancellationToken(),
mockLogger,
stream
).generateData();
await jest.advanceTimersByTimeAsync(requestDuration);
expect(await generateCsvPromise).toEqual(
expect.objectContaining({
warnings: expect.arrayContaining([
expect.stringContaining('Received a 408 response from Elasticsearch'),
]),
})
);
expect(mockDataClientSearchFn).toBeCalledWith(
{ params: { filter: undefined, locale: 'en', query: '' } },
{
strategy: 'esql',
transport: {
requestTimeout: `${timeFromNowInMs}ms`,
},
abortSignal: expect.any(AbortSignal),
}
);
});
});
describe('jobParams', () => {
it('uses columns to select columns', async () => {
mockSearchResponse({
@ -225,6 +415,7 @@ describe('CsvESQLGenerator', () => {
const generateCsv = new CsvESQLGenerator(
createMockJob({ columns: ['message', 'date', 'something else'] }),
mockConfig,
mockTaskInstanceFields,
{
es: mockEsClient,
data: mockDataClient,
@ -259,6 +450,7 @@ describe('CsvESQLGenerator', () => {
const generateCsv = new CsvESQLGenerator(
createMockJob({ query, filters }),
mockConfig,
mockTaskInstanceFields,
{
es: mockEsClient,
data: mockDataClient,
@ -318,6 +510,7 @@ describe('CsvESQLGenerator', () => {
const generateCsv = new CsvESQLGenerator(
createMockJob(),
mockConfig,
mockTaskInstanceFields,
{
es: mockEsClient,
data: mockDataClient,
@ -347,6 +540,7 @@ describe('CsvESQLGenerator', () => {
const generateCsv = new CsvESQLGenerator(
createMockJob(),
mockConfig,
mockTaskInstanceFields,
{
es: mockEsClient,
data: mockDataClient,
@ -385,6 +579,7 @@ describe('CsvESQLGenerator', () => {
const generateCsv = new CsvESQLGenerator(
createMockJob(),
mockConfig,
mockTaskInstanceFields,
{
es: mockEsClient,
data: mockDataClient,
@ -413,6 +608,7 @@ describe('CsvESQLGenerator', () => {
const generateCsv = new CsvESQLGenerator(
createMockJob(),
mockConfig,
mockTaskInstanceFields,
{
es: mockEsClient,
data: mockDataClient,
@ -449,6 +645,7 @@ describe('CsvESQLGenerator', () => {
const generateCsv = new CsvESQLGenerator(
createMockJob(),
mockConfig,
mockTaskInstanceFields,
{
es: mockEsClient,
data: mockDataClient,

View file

@ -30,6 +30,7 @@ import {
} from '@kbn/reporting-common';
import type { TaskRunResult } from '@kbn/reporting-common/types';
import type { ReportingConfigType } from '@kbn/reporting-server';
import { type TaskInstanceFields } from '@kbn/reporting-common/types';
import { zipObject } from 'lodash';
import { CONTENT_TYPE_CSV } from '../constants';
@ -58,6 +59,7 @@ export class CsvESQLGenerator {
constructor(
private job: JobParamsCsvESQL,
private config: ReportingConfigType['csv'],
private taskInstanceFields: TaskInstanceFields,
private clients: Clients,
private cancellationToken: CancellationToken,
private logger: Logger,
@ -67,6 +69,7 @@ export class CsvESQLGenerator {
public async generateData(): Promise<TaskRunResult> {
const settings = await getExportSettings(
this.clients.uiSettings,
this.taskInstanceFields,
this.config,
this.job.browserTimezone,
this.logger
@ -111,7 +114,7 @@ export class CsvESQLGenerator {
strategy: ESQL_SEARCH_STRATEGY,
abortSignal: abortController.signal,
transport: {
requestTimeout: settings.scroll.duration,
requestTimeout: settings.scroll.duration(this.taskInstanceFields),
},
})
);

View file

@ -13,6 +13,8 @@ import {
uiSettingsServiceMock,
} from '@kbn/core/server/mocks';
import type { ReportingConfigType } from '@kbn/reporting-server';
import type { TaskInstanceFields } from '@kbn/reporting-common/types';
import { sub, add, type Duration } from 'date-fns';
import {
UI_SETTINGS_CSV_QUOTE_VALUES,
@ -25,6 +27,7 @@ import { getExportSettings } from './get_export_settings';
describe('getExportSettings', () => {
let uiSettingsClient: IUiSettingsClient;
let config: ReportingConfigType['csv'];
let taskInstanceFields: TaskInstanceFields;
const logger = loggingSystemMock.createLogger();
beforeEach(() => {
@ -38,6 +41,8 @@ describe('getExportSettings', () => {
enablePanelActionDownload: true,
};
taskInstanceFields = { startedAt: null, retryAt: null };
uiSettingsClient = uiSettingsServiceMock
.createStartContract()
.asScopedToClient(savedObjectsClientMock.create());
@ -53,31 +58,42 @@ describe('getExportSettings', () => {
return false;
}
return 'helo world';
return 'hello world';
});
});
test('getExportSettings: returns the expected result', async () => {
expect(await getExportSettings(uiSettingsClient, config, '', logger)).toMatchObject({
bom: '',
checkForFormulas: true,
escapeFormulaValues: false,
includeFrozen: false,
maxConcurrentShardRequests: 5,
maxSizeBytes: 180000,
scroll: {
duration: '30s',
size: 500,
},
separator: ',',
timezone: 'UTC',
});
expect(await getExportSettings(uiSettingsClient, taskInstanceFields, config, '', logger))
.toMatchInlineSnapshot(`
Object {
"bom": "",
"checkForFormulas": true,
"escapeFormulaValues": false,
"escapeValue": [Function],
"includeFrozen": false,
"maxConcurrentShardRequests": 5,
"maxSizeBytes": 180000,
"scroll": Object {
"duration": [Function],
"size": 500,
"strategy": "pit",
},
"separator": ",",
"taskInstanceFields": Object {
"retryAt": null,
"startedAt": null,
},
"timezone": "UTC",
}
`);
});
test('does not add a default scroll strategy', async () => {
// @ts-expect-error undefined isn't allowed
config = { ...config, scroll: { strategy: undefined } };
expect(await getExportSettings(uiSettingsClient, config, '', logger)).toMatchObject(
expect(
await getExportSettings(uiSettingsClient, taskInstanceFields, config, '', logger)
).toMatchObject(
expect.objectContaining({ scroll: expect.objectContaining({ strategy: undefined }) })
);
});
@ -85,7 +101,9 @@ describe('getExportSettings', () => {
test('passes the scroll=pit strategy through', async () => {
config = { ...config, scroll: { ...config.scroll, strategy: 'pit' } };
expect(await getExportSettings(uiSettingsClient, config, '', logger)).toMatchObject(
expect(
await getExportSettings(uiSettingsClient, taskInstanceFields, config, '', logger)
).toMatchObject(
expect.objectContaining({ scroll: expect.objectContaining({ strategy: 'pit' }) })
);
});
@ -93,7 +111,9 @@ describe('getExportSettings', () => {
test('passes the scroll=scroll strategy through', async () => {
config = { ...config, scroll: { ...config.scroll, strategy: 'scroll' } };
expect(await getExportSettings(uiSettingsClient, config, '', logger)).toMatchObject(
expect(
await getExportSettings(uiSettingsClient, taskInstanceFields, config, '', logger)
).toMatchObject(
expect.objectContaining({
scroll: expect.objectContaining({
strategy: 'scroll',
@ -103,7 +123,13 @@ describe('getExportSettings', () => {
});
test('escapeValue function', async () => {
const { escapeValue } = await getExportSettings(uiSettingsClient, config, '', logger);
const { escapeValue } = await getExportSettings(
uiSettingsClient,
taskInstanceFields,
config,
'',
logger
);
expect(escapeValue(`test`)).toBe(`test`);
expect(escapeValue(`this is, a test`)).toBe(`"this is, a test"`);
expect(escapeValue(`"tet"`)).toBe(`"""tet"""`);
@ -119,7 +145,111 @@ describe('getExportSettings', () => {
});
expect(
await getExportSettings(uiSettingsClient, config, '', logger).then(({ timezone }) => timezone)
await getExportSettings(uiSettingsClient, taskInstanceFields, config, '', logger).then(
({ timezone }) => timezone
)
).toBe(`America/Aruba`);
});
describe('scroll duration function', () => {
let spiedDateNow: jest.Spied<typeof Date.now>;
let mockedTaskInstanceFields: TaskInstanceFields;
const durationApart: Duration = { minutes: 5 };
beforeEach(() => {
const now = Date.now();
// freeze time for test
spiedDateNow = jest.spyOn(Date, 'now').mockReturnValue(now);
mockedTaskInstanceFields = {
startedAt: sub(new Date(Date.now()), durationApart),
retryAt: add(new Date(Date.now()), durationApart),
};
});
afterEach(() => {
spiedDateNow.mockRestore();
});
it('returns its specified value when value is not auto', async () => {
const { scroll } = await getExportSettings(
uiSettingsClient,
taskInstanceFields,
config,
'',
logger
);
expect(scroll.duration(mockedTaskInstanceFields)).toBe(config.scroll.duration);
});
it('throws when the scroll duration config is auto and retryAt value of the taskInstanceField passed is falsy', async () => {
const configWithScrollAutoDuration = {
...config,
scroll: {
...config.scroll,
duration: 'auto',
},
};
const { scroll } = await getExportSettings(
uiSettingsClient,
taskInstanceFields,
configWithScrollAutoDuration,
'',
logger
);
expect(
scroll.duration.bind(null, { startedAt: new Date(Date.now()), retryAt: null })
).toThrow();
});
it('returns a value that is the difference of the current time from the value of retryAt provided in the passed taskInstanceFields', async () => {
const configWithScrollAutoDuration = {
...config,
scroll: {
...config.scroll,
duration: 'auto',
},
};
const { scroll } = await getExportSettings(
uiSettingsClient,
taskInstanceFields,
configWithScrollAutoDuration,
'',
logger
);
expect(scroll.duration(mockedTaskInstanceFields)).toBe(
`${durationApart.minutes! * 60 * 1000}ms`
);
});
it('returns 0 if current time exceeds the value of retryAt provided in the passed taskInstanceFields', async () => {
const configWithScrollAutoDuration = {
...config,
scroll: {
...config.scroll,
duration: 'auto',
},
};
spiedDateNow.mockReturnValue(
add(mockedTaskInstanceFields.retryAt!, { minutes: 5 }).getTime()
);
const { scroll } = await getExportSettings(
uiSettingsClient,
taskInstanceFields,
configWithScrollAutoDuration,
'',
logger
);
expect(scroll.duration(mockedTaskInstanceFields)).toBe('0ms');
});
});
});

View file

@ -10,7 +10,7 @@ import type { ByteSizeValue } from '@kbn/config-schema';
import type { IUiSettingsClient, Logger } from '@kbn/core/server';
import { createEscapeValue } from '@kbn/data-plugin/common';
import type { ReportingConfigType } from '@kbn/reporting-server';
import type { TaskInstanceFields } from '@kbn/reporting-common/types';
import {
CSV_BOM_CHARS,
UI_SETTINGS_CSV_QUOTE_VALUES,
@ -22,10 +22,14 @@ import { CsvPagingStrategy } from '../../types';
export interface CsvExportSettings {
timezone: string;
taskInstanceFields: TaskInstanceFields;
scroll: {
strategy?: CsvPagingStrategy;
size: number;
duration: string;
/**
* compute scroll duration, duration is returned in ms by default
*/
duration: (args: TaskInstanceFields, format?: 'ms' | 's') => string;
};
bom: string;
separator: string;
@ -39,6 +43,7 @@ export interface CsvExportSettings {
export const getExportSettings = async (
client: IUiSettingsClient,
taskInstanceFields: TaskInstanceFields,
config: ReportingConfigType['csv'],
timezone: string | undefined,
logger: Logger
@ -75,10 +80,33 @@ export const getExportSettings = async (
return {
timezone: setTimezone,
taskInstanceFields,
scroll: {
strategy: config.scroll.strategy as CsvPagingStrategy,
size: config.scroll.size,
duration: config.scroll.duration,
duration: ({ retryAt }, format = 'ms') => {
if (config.scroll.duration !== 'auto') {
return config.scroll.duration;
}
if (!retryAt) {
throw new Error(
'config "xpack.reporting.csv.scroll.duration" of "auto" mandates that the task instance field passed specifies a retryAt value'
);
}
const now = new Date(Date.now()).getTime();
const timeTillRetry = new Date(retryAt).getTime();
if (now >= timeTillRetry) {
return `0${format}`;
}
const _duration = timeTillRetry - now;
const result = format === 'ms' ? `${_duration}ms` : `${_duration / 1000}s`;
logger.debug(`using timeout duration of ${result} for csv scroll`);
return result;
},
},
bom,
includeFrozen,

View file

@ -23,7 +23,7 @@ export interface SearchCursorClients {
export type SearchCursorSettings = Pick<
CsvExportSettings,
'scroll' | 'includeFrozen' | 'maxConcurrentShardRequests'
'scroll' | 'includeFrozen' | 'maxConcurrentShardRequests' | 'taskInstanceFields'
>;
export abstract class SearchCursor {
@ -33,6 +33,7 @@ export abstract class SearchCursor {
protected indexPatternTitle: string,
protected settings: SearchCursorSettings,
protected clients: SearchCursorClients,
protected abortController: AbortController,
protected logger: Logger
) {}

View file

@ -24,11 +24,12 @@ describe('CSV Export Search Cursor', () => {
beforeEach(async () => {
settings = {
scroll: {
duration: '10m',
duration: jest.fn(() => '10m'),
size: 500,
},
includeFrozen: false,
maxConcurrentShardRequests: 5,
taskInstanceFields: { startedAt: null, retryAt: null },
};
es = elasticsearchServiceMock.createScopedClusterClient();
@ -37,7 +38,13 @@ describe('CSV Export Search Cursor', () => {
logger = loggingSystemMock.createLogger();
cursor = new SearchCursorPit('test-index-pattern-string', settings, { data, es }, logger);
cursor = new SearchCursorPit(
'test-index-pattern-string',
settings,
{ data, es },
new AbortController(),
logger
);
const openPointInTimeSpy = jest
// @ts-expect-error create spy on private method

View file

@ -24,9 +24,10 @@ export class SearchCursorPit extends SearchCursor {
indexPatternTitle: string,
settings: SearchCursorSettings,
clients: SearchCursorClients,
abortController: AbortController,
logger: Logger
) {
super(indexPatternTitle, settings, clients, logger);
super(indexPatternTitle, settings, clients, abortController, logger);
}
/**
@ -37,7 +38,7 @@ export class SearchCursorPit extends SearchCursor {
}
private async openPointInTime() {
const { includeFrozen, maxConcurrentShardRequests, scroll } = this.settings;
const { includeFrozen, maxConcurrentShardRequests, scroll, taskInstanceFields } = this.settings;
let pitId: string | undefined;
@ -47,13 +48,14 @@ export class SearchCursorPit extends SearchCursor {
const response = await this.clients.es.asCurrentUser.openPointInTime(
{
index: this.indexPatternTitle,
keep_alive: scroll.duration,
keep_alive: scroll.duration(taskInstanceFields),
ignore_unavailable: true,
// @ts-expect-error ignore_throttled is not in the type definition, but it is accepted by es
ignore_throttled: includeFrozen ? false : undefined, // "true" will cause deprecation warnings logged in ES
},
{
requestTimeout: scroll.duration,
signal: this.abortController.signal,
requestTimeout: scroll.duration(taskInstanceFields),
maxRetries: 0,
maxConcurrentShardRequests,
}
@ -73,7 +75,7 @@ export class SearchCursorPit extends SearchCursor {
}
private async searchWithPit(searchBody: SearchRequest) {
const { maxConcurrentShardRequests, scroll } = this.settings;
const { maxConcurrentShardRequests, scroll, taskInstanceFields } = this.settings;
const searchParamsPit = {
params: {
@ -85,22 +87,25 @@ export class SearchCursorPit extends SearchCursor {
return await lastValueFrom(
this.clients.data.search(searchParamsPit, {
strategy: ES_SEARCH_STRATEGY,
abortSignal: this.abortController.signal,
transport: {
maxRetries: 0, // retrying reporting jobs is handled in the task manager scheduling logic
requestTimeout: scroll.duration,
requestTimeout: scroll.duration(taskInstanceFields),
},
})
);
}
public async getPage(searchSource: ISearchSource) {
const { scroll, taskInstanceFields } = this.settings;
if (!this.cursorId) {
throw new Error(`No access to valid PIT ID!`);
}
searchSource.setField('pit', {
id: this.cursorId,
keep_alive: this.settings.scroll.duration,
keep_alive: scroll.duration(taskInstanceFields),
});
const searchAfter = this.getSearchAfter();

View file

@ -24,10 +24,11 @@ describe('CSV Export Search Cursor', () => {
beforeEach(async () => {
settings = {
scroll: {
duration: '10m',
duration: jest.fn(() => '10m'),
size: 500,
},
includeFrozen: false,
taskInstanceFields: { startedAt: null, retryAt: null },
maxConcurrentShardRequests: 5,
};
@ -37,7 +38,14 @@ describe('CSV Export Search Cursor', () => {
logger = loggingSystemMock.createLogger();
cursor = new SearchCursorScroll('test-index-pattern-string', settings, { data, es }, logger);
cursor = new SearchCursorScroll(
'test-index-pattern-string',
settings,
{ data, es },
new AbortController(),
logger
);
await cursor.initialize();
});

View file

@ -23,22 +23,23 @@ export class SearchCursorScroll extends SearchCursor {
indexPatternTitle: string,
settings: SearchCursorSettings,
clients: SearchCursorClients,
abortController: AbortController,
logger: Logger
) {
super(indexPatternTitle, settings, clients, logger);
super(indexPatternTitle, settings, clients, abortController, logger);
}
// The first search query begins the scroll context in ES
public async initialize() {}
private async scan(searchBody: SearchRequest) {
const { includeFrozen, maxConcurrentShardRequests, scroll } = this.settings;
const { includeFrozen, maxConcurrentShardRequests, scroll, taskInstanceFields } = this.settings;
const searchParamsScan = {
params: {
body: searchBody,
index: this.indexPatternTitle,
scroll: scroll.duration,
scroll: scroll.duration(taskInstanceFields),
size: scroll.size,
ignore_throttled: includeFrozen ? false : undefined, // "true" will cause deprecation warnings logged in ES
max_concurrent_shard_requests: maxConcurrentShardRequests,
@ -48,21 +49,23 @@ export class SearchCursorScroll extends SearchCursor {
return await lastValueFrom(
this.clients.data.search(searchParamsScan, {
strategy: ES_SEARCH_STRATEGY,
abortSignal: this.abortController.signal,
transport: {
maxRetries: 0, // retrying reporting jobs is handled in the task manager scheduling logic
requestTimeout: scroll.duration,
requestTimeout: scroll.duration(taskInstanceFields),
},
})
);
}
private async scroll() {
const { duration } = this.settings.scroll;
const { scroll, taskInstanceFields } = this.settings;
return await this.clients.es.asCurrentUser.scroll(
{ scroll: duration, scroll_id: this.cursorId },
{ scroll: scroll.duration(taskInstanceFields), scroll_id: this.cursorId },
{
signal: this.abortController.signal,
maxRetries: 0, // retrying reporting jobs is handled in the task manager scheduling logic
requestTimeout: duration,
requestTimeout: scroll.duration(taskInstanceFields),
}
);
}

View file

@ -147,6 +147,7 @@ export class CsvV2ExportType extends ExportType<
...job,
},
csvConfig,
taskInstanceFields,
clients,
cancellationToken,
logger,

View file

@ -74,10 +74,10 @@ const CsvSchema = schema.object({
{ defaultValue: 'pit' }
),
duration: schema.string({
defaultValue: '30s', // this value is passed directly to ES, so string only format is preferred
defaultValue: '30s', // values other than "auto" are passed directly to ES, so string only format is preferred
validate(value) {
if (!/^[0-9]+(d|h|m|s|ms|micros|nanos)$/.test(value)) {
return 'must be a duration string';
if (!/(^[0-9]+(d|h|m|s|ms|micros|nanos)|auto)$/.test(value)) {
return 'must be either "auto" or a duration string';
}
},
}),

View file

@ -173,7 +173,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
const capacity = this.pool.availableWorkers;
if (!capacity) {
// if there isn't capacity, emit a load event so that we can expose how often
// high load causes the poller to skip work (work isn'tcalled when there is no capacity)
// high load causes the poller to skip work (work isn't called when there is no capacity)
this.emitEvent(asTaskManagerStatEvent('load', asOk(this.pool.workerLoad)));
// Emit event indicating task manager utilization

View file

@ -518,7 +518,7 @@ export class TaskManagerRunner implements TaskRunner {
error: new Error('Task timeout'),
addDuration: this.definition.timeout,
})) ?? null,
// This is a safe convertion as we're setting the startAt above
// This is a safe conversion as we're setting the startAt above
},
{ validate: false }
)) as ConcreteTaskInstanceWithStartedAt