[Reporting] use point-in-time for paging search results (#144201)

* [Reporting] use point-in-time for paging search results

* add new PIT tests to data plugin

* fix deprecation

* update point-in-time ID to the latest one received

* add warning for shard failure

* fix/cleanup csv generation test

* add requestTimeout to openPit request

* logging polishes

* fix test

* remove confusing comment

Co-authored-by: Jean-Louis Leysens <jeanlouis.leysens@elastic.co>
This commit is contained in:
Tim Sullivan 2022-11-07 10:35:06 -07:00 committed by GitHub
parent 8d5ba8706f
commit 455fb1d1c7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 392 additions and 371 deletions

View file

@ -903,6 +903,13 @@ describe('SearchSource', () => {
expect(Object.keys(JSON.parse(searchSourceJSON))).toEqual(['highlightAll', 'from', 'sort']);
});
test('should add pit', () => {
const pit = { id: 'flimflam', keep_alive: '1m' };
searchSource.setField('pit', pit);
const { searchSourceJSON } = searchSource.serialize();
expect(searchSourceJSON).toBe(JSON.stringify({ pit }));
});
test('should serialize filters', () => {
const filter = [
{

View file

@ -667,6 +667,8 @@ export class SearchSource {
getConfig(UI_SETTINGS.SORT_OPTIONS)
);
return addToBody(key, sort);
case 'pit':
return addToRoot(key, val);
case 'aggs':
if ((val as unknown) instanceof AggConfigs) {
return addToBody('aggs', val.toDsl());
@ -768,7 +770,7 @@ export class SearchSource {
const { getConfig } = this.dependencies;
const searchRequest = this.mergeProps();
searchRequest.body = searchRequest.body || {};
const { body, index, query, filters, highlightAll } = searchRequest;
const { body, index, query, filters, highlightAll, pit } = searchRequest;
searchRequest.indexType = this.getIndexType(index);
const metaFields = getConfig(UI_SETTINGS.META_FIELDS) ?? [];
@ -911,6 +913,10 @@ export class SearchSource {
delete searchRequest.highlightAll;
}
if (pit) {
body.pit = pit;
}
return searchRequest;
}

View file

@ -39,6 +39,9 @@ export interface ISearchStartSearchSource
createEmpty: () => ISearchSource;
}
/**
* @deprecated use {@link estypes.SortResults} instead.
*/
export type EsQuerySearchAfter = [string | number, string | number];
export enum SortDirection {
@ -112,9 +115,13 @@ export interface SearchSourceFields {
* {@link IndexPatternService}
*/
index?: DataView;
searchAfter?: EsQuerySearchAfter;
timeout?: string;
terminate_after?: number;
searchAfter?: estypes.SortResults;
/**
* Allow querying to use a point-in-time ID for paging results
*/
pit?: estypes.SearchPointInTimeReference;
parent?: SearchSourceFields;
}
@ -160,7 +167,7 @@ export type SerializedSearchSourceFields = {
* {@link IndexPatternService}
*/
index?: string | DataViewSpec;
searchAfter?: EsQuerySearchAfter;
searchAfter?: estypes.SortResults;
timeout?: string;
terminate_after?: number;

View file

@ -35,13 +35,17 @@ export const esSearchStrategyProvider = (
throw new KbnServerError(`Unsupported index pattern type ${request.indexType}`, 400);
}
const isPit = request.params?.body?.pit != null;
const search = async () => {
try {
const config = await firstValueFrom(config$);
// @ts-expect-error params fall back to any, but should be valid SearchRequest params
const { terminateAfter, ...requestParams } = request.params ?? {};
const defaults = await getDefaultSearchParams(uiSettingsClient, { isPit });
const params = {
...(await getDefaultSearchParams(uiSettingsClient)),
...defaults,
...getShardTimeout(config),
...(terminateAfter ? { terminate_after: terminateAfter } : {}),
...requestParams,

View file

@ -18,19 +18,29 @@ export function getShardTimeout(
}
export async function getDefaultSearchParams(
uiSettingsClient: Pick<IUiSettingsClient, 'get'>
uiSettingsClient: Pick<IUiSettingsClient, 'get'>,
options = { isPit: false }
): Promise<{
max_concurrent_shard_requests?: number;
ignore_unavailable: boolean;
ignore_unavailable?: boolean;
track_total_hits: boolean;
}> {
const maxConcurrentShardRequests = await uiSettingsClient.get<number>(
UI_SETTINGS.COURIER_MAX_CONCURRENT_SHARD_REQUESTS
);
return {
const defaults: Awaited<ReturnType<typeof getDefaultSearchParams>> = {
max_concurrent_shard_requests:
maxConcurrentShardRequests > 0 ? maxConcurrentShardRequests : undefined,
ignore_unavailable: true, // Don't fail if the index/indices don't exist
track_total_hits: true,
};
// If the request has a point-in-time ID attached, it can not include ignore_unavailable from {@link estypes.IndicesOptions}.
// ES will reject the request as that option was set when the point-in-time was created.
// Otherwise, this option allows search to not fail when the index/indices don't exist
if (!options.isPit) {
defaults.ignore_unavailable = true;
}
return defaults;
}

View file

@ -73,7 +73,7 @@ exports[`keeps order of the columns during the scroll 1`] = `
"
`;
exports[`uses the scrollId to page all the data 1`] = `
exports[`uses the pit ID to page all the data 1`] = `
"date,ip,message
\\"2020-12-31T00:14:28.000Z\\",\\"110.135.176.89\\",\\"hit from the initial search\\"
\\"2020-12-31T00:14:28.000Z\\",\\"110.135.176.89\\",\\"hit from the initial search\\"

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { errors as esErrors } from '@elastic/elasticsearch';
import { errors as esErrors, estypes } from '@elastic/elasticsearch';
import type { SearchResponse } from '@elastic/elasticsearch/lib/api/types';
import type { IScopedClusterClient, IUiSettingsClient, Logger } from '@kbn/core/server';
import {
@ -50,6 +50,7 @@ const searchSourceMock = {
...searchSourceInstanceMock,
getSearchRequestBody: jest.fn(() => ({})),
};
const mockSearchSourceService: jest.Mocked<ISearchStartSearchSource> = {
create: jest.fn().mockReturnValue(searchSourceMock),
createEmpty: jest.fn().mockReturnValue(searchSourceMock),
@ -58,19 +59,21 @@ const mockSearchSourceService: jest.Mocked<ISearchStartSearchSource> = {
extract: jest.fn(),
getAllMigrations: jest.fn(),
};
const mockPitId = 'oju9fs3698s3902f02-8qg3-u9w36oiewiuyew6';
const getMockRawResponse = (hits: Array<estypes.SearchHit<unknown>> = [], total = hits.length) => ({
took: 1,
timed_out: false,
pit_id: mockPitId,
_shards: { total: 1, successful: 1, failed: 0, skipped: 0 },
hits: { hits, total, max_score: 0 },
});
const mockDataClientSearchDefault = jest.fn().mockImplementation(
(): Rx.Observable<{ rawResponse: SearchResponse<unknown> }> =>
Rx.of({
rawResponse: {
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, failed: 0, skipped: 0 },
hits: {
hits: [],
total: 0,
max_score: 0,
},
},
rawResponse: getMockRawResponse(),
})
);
@ -92,6 +95,8 @@ beforeEach(async () => {
mockDataClient = dataPluginMock.createStartContract().search.asScoped({} as any);
mockDataClient.search = mockDataClientSearchDefault;
mockEsClient.asCurrentUser.openPointInTime = jest.fn().mockResolvedValueOnce({ id: mockPitId });
uiSettingsClient = uiSettingsServiceMock
.createStartContract()
.asScopedToClient(savedObjectsClientMock.create());
@ -117,6 +122,8 @@ beforeEach(async () => {
searchSourceMock.getField = jest.fn((key: string) => {
switch (key) {
case 'pit':
return { id: mockPitId };
case 'index':
return {
fields: {
@ -125,6 +132,7 @@ beforeEach(async () => {
},
metaFields: ['_id', '_index', '_type', '_score'],
getFormatterForField: jest.fn(),
getIndexPattern: () => 'logstash-*',
};
}
});
@ -157,20 +165,15 @@ it('formats an empty search result to CSV content', async () => {
it('formats a search result to CSV content', async () => {
mockDataClient.search = jest.fn().mockImplementation(() =>
Rx.of({
rawResponse: {
hits: {
hits: [
{
fields: {
date: `["2020-12-31T00:14:28.000Z"]`,
ip: `["110.135.176.89"]`,
message: `["This is a great message!"]`,
},
},
],
total: 1,
},
},
rawResponse: getMockRawResponse([
{
fields: {
date: `["2020-12-31T00:14:28.000Z"]`,
ip: `["110.135.176.89"]`,
message: `["This is a great message!"]`,
},
} as unknown as estypes.SearchHit,
]),
})
);
const generateCsv = new CsvGenerator(
@ -199,16 +202,16 @@ const HITS_TOTAL = 100;
it('calculates the bytes of the content', async () => {
mockDataClient.search = jest.fn().mockImplementation(() =>
Rx.of({
rawResponse: {
hits: {
hits: range(0, HITS_TOTAL).map(() => ({
fields: {
message: ['this is a great message'],
},
})),
total: HITS_TOTAL,
},
},
rawResponse: getMockRawResponse(
range(0, HITS_TOTAL).map(
() =>
({
fields: {
message: ['this is a great message'],
},
} as unknown as estypes.SearchHit)
)
),
})
);
@ -246,18 +249,18 @@ it('warns if max size was reached', async () => {
mockDataClient.search = jest.fn().mockImplementation(() =>
Rx.of({
rawResponse: {
hits: {
hits: range(0, HITS_TOTAL).map(() => ({
fields: {
date: ['2020-12-31T00:14:28.000Z'],
ip: ['110.135.176.89'],
message: ['super cali fragile istic XPLA docious'],
},
})),
total: HITS_TOTAL,
},
},
rawResponse: getMockRawResponse(
range(0, HITS_TOTAL).map(
() =>
({
fields: {
date: ['2020-12-31T00:14:28.000Z'],
ip: ['110.135.176.89'],
message: ['super cali fragile istic XPLA docious'],
},
} as unknown as estypes.SearchHit)
)
),
})
);
@ -283,36 +286,42 @@ it('warns if max size was reached', async () => {
expect(content).toMatchSnapshot();
});
it('uses the scrollId to page all the data', async () => {
mockDataClient.search = jest.fn().mockImplementation(() =>
Rx.of({
rawResponse: {
_scroll_id: 'awesome-scroll-hero',
hits: {
hits: range(0, HITS_TOTAL / 10).map(() => ({
fields: {
date: ['2020-12-31T00:14:28.000Z'],
ip: ['110.135.176.89'],
message: ['hit from the initial search'],
},
})),
total: HITS_TOTAL,
},
},
})
);
mockEsClient.asCurrentUser.scroll = jest.fn().mockResolvedValue({
hits: {
hits: range(0, HITS_TOTAL / 10).map(() => ({
fields: {
date: ['2020-12-31T00:14:28.000Z'],
ip: ['110.135.176.89'],
message: ['hit from a subsequent scroll'],
},
})),
},
});
it('uses the pit ID to page all the data', async () => {
mockDataClient.search = jest
.fn()
.mockImplementationOnce(() =>
Rx.of({
rawResponse: getMockRawResponse(
range(0, HITS_TOTAL / 10).map(
() =>
({
fields: {
date: ['2020-12-31T00:14:28.000Z'],
ip: ['110.135.176.89'],
message: ['hit from the initial search'],
},
} as unknown as estypes.SearchHit)
),
HITS_TOTAL
),
})
)
.mockImplementation(() =>
Rx.of({
rawResponse: getMockRawResponse(
range(0, HITS_TOTAL / 10).map(
() =>
({
fields: {
date: ['2020-12-31T00:14:28.000Z'],
ip: ['110.135.176.89'],
message: ['hit from a subsequent scroll'],
},
} as unknown as estypes.SearchHit)
)
),
})
);
const generateCsv = new CsvGenerator(
createMockJob({ columns: ['date', 'ip', 'message'] }),
@ -334,70 +343,55 @@ it('uses the scrollId to page all the data', async () => {
expect(csvResult.warnings).toEqual([]);
expect(content).toMatchSnapshot();
expect(mockDataClient.search).toHaveBeenCalledTimes(1);
expect(mockDataClient.search).toHaveBeenCalledTimes(10);
expect(mockDataClient.search).toBeCalledWith(
{ params: { body: {}, ignore_throttled: undefined, scroll: '30s', size: 500 } },
{ params: { body: {}, ignore_throttled: undefined } },
{ strategy: 'es', transport: { maxRetries: 0, requestTimeout: '30s' } }
);
// `scroll` and `clearScroll` must be called with scroll ID in the post body!
expect(mockEsClient.asCurrentUser.scroll).toHaveBeenCalledTimes(9);
expect(mockEsClient.asCurrentUser.scroll).toHaveBeenCalledWith({
scroll: '30s',
scroll_id: 'awesome-scroll-hero',
});
expect(mockEsClient.asCurrentUser.openPointInTime).toHaveBeenCalledTimes(1);
expect(mockEsClient.asCurrentUser.openPointInTime).toHaveBeenCalledWith(
{
ignore_unavailable: true,
index: 'logstash-*',
keep_alive: '30s',
},
{ maxRetries: 0, requestTimeout: '30s' }
);
expect(mockEsClient.asCurrentUser.clearScroll).toHaveBeenCalledTimes(1);
expect(mockEsClient.asCurrentUser.clearScroll).toHaveBeenCalledWith({
scroll_id: ['awesome-scroll-hero'],
expect(mockEsClient.asCurrentUser.closePointInTime).toHaveBeenCalledTimes(1);
expect(mockEsClient.asCurrentUser.closePointInTime).toHaveBeenCalledWith({
body: { id: mockPitId },
});
});
it('keeps order of the columns during the scroll', async () => {
mockDataClient.search = jest.fn().mockImplementation(() =>
Rx.of({
rawResponse: {
_scroll_id: 'awesome-scroll-hero',
hits: {
hits: [
{
fields: {
a: ['a1'],
b: ['b1'],
},
},
],
total: 3,
},
},
})
);
mockEsClient.asCurrentUser.scroll = jest
mockDataClient.search = jest
.fn()
.mockResolvedValueOnce({
hits: {
hits: [
{
fields: {
b: ['b2'],
},
},
],
},
})
.mockResolvedValueOnce({
hits: {
hits: [
{
fields: {
a: ['a3'],
c: ['c3'],
},
},
],
},
});
.mockImplementationOnce(() =>
Rx.of({
rawResponse: getMockRawResponse(
[{ fields: { a: ['a1'], b: ['b1'] } } as unknown as estypes.SearchHit],
3
),
})
)
.mockImplementationOnce(() =>
Rx.of({
rawResponse: getMockRawResponse(
[{ fields: { b: ['b2'] } } as unknown as estypes.SearchHit],
3
),
})
)
.mockImplementationOnce(() =>
Rx.of({
rawResponse: getMockRawResponse(
[{ fields: { a: ['a3'], c: ['c3'] } } as unknown as estypes.SearchHit],
3
),
})
);
const generateCsv = new CsvGenerator(
createMockJob({ searchSource: {}, columns: [] }),
@ -424,21 +418,16 @@ describe('fields from job.searchSource.getFields() (7.12 generated)', () => {
it('cells can be multi-value', async () => {
mockDataClient.search = jest.fn().mockImplementation(() =>
Rx.of({
rawResponse: {
hits: {
hits: [
{
_id: 'my-cool-id',
_index: 'my-cool-index',
_version: 4,
fields: {
sku: [`This is a cool SKU.`, `This is also a cool SKU.`],
},
},
],
total: 1,
rawResponse: getMockRawResponse([
{
_id: 'my-cool-id',
_index: 'my-cool-index',
_version: 4,
fields: {
sku: [`This is a cool SKU.`, `This is also a cool SKU.`],
},
},
},
]),
})
);
@ -466,22 +455,17 @@ describe('fields from job.searchSource.getFields() (7.12 generated)', () => {
it('provides top-level underscored fields as columns', async () => {
mockDataClient.search = jest.fn().mockImplementation(() =>
Rx.of({
rawResponse: {
hits: {
hits: [
{
_id: 'my-cool-id',
_index: 'my-cool-index',
_version: 4,
fields: {
date: ['2020-12-31T00:14:28.000Z'],
message: [`it's nice to see you`],
},
},
],
total: 1,
rawResponse: getMockRawResponse([
{
_id: 'my-cool-id',
_index: 'my-cool-index',
_version: 4,
fields: {
date: ['2020-12-31T00:14:28.000Z'],
message: [`it's nice to see you`],
},
},
},
]),
})
);
@ -520,28 +504,23 @@ describe('fields from job.searchSource.getFields() (7.12 generated)', () => {
it('sorts the fields when they are to be used as table column names', async () => {
mockDataClient.search = jest.fn().mockImplementation(() =>
Rx.of({
rawResponse: {
hits: {
hits: [
{
_id: 'my-cool-id',
_index: 'my-cool-index',
_version: 4,
fields: {
date: ['2020-12-31T00:14:28.000Z'],
message_z: [`test field Z`],
message_y: [`test field Y`],
message_x: [`test field X`],
message_w: [`test field W`],
message_v: [`test field V`],
message_u: [`test field U`],
message_t: [`test field T`],
},
},
],
total: 1,
rawResponse: getMockRawResponse([
{
_id: 'my-cool-id',
_index: 'my-cool-index',
_version: 4,
fields: {
date: ['2020-12-31T00:14:28.000Z'],
message_z: [`test field Z`],
message_y: [`test field Y`],
message_x: [`test field X`],
message_w: [`test field W`],
message_v: [`test field V`],
message_u: [`test field U`],
message_t: [`test field T`],
},
},
},
]),
})
);
@ -581,22 +560,17 @@ describe('fields from job.columns (7.13+ generated)', () => {
it('cells can be multi-value', async () => {
mockDataClient.search = jest.fn().mockImplementation(() =>
Rx.of({
rawResponse: {
hits: {
hits: [
{
_id: 'my-cool-id',
_index: 'my-cool-index',
_version: 4,
fields: {
product: 'coconut',
category: [`cool`, `rad`],
},
},
],
total: 1,
rawResponse: getMockRawResponse([
{
_id: 'my-cool-id',
_index: 'my-cool-index',
_version: 4,
fields: {
product: 'coconut',
category: [`cool`, `rad`],
},
},
},
]),
})
);
@ -624,22 +598,17 @@ describe('fields from job.columns (7.13+ generated)', () => {
it('columns can be top-level fields such as _id and _index', async () => {
mockDataClient.search = jest.fn().mockImplementation(() =>
Rx.of({
rawResponse: {
hits: {
hits: [
{
_id: 'my-cool-id',
_index: 'my-cool-index',
_version: 4,
fields: {
product: 'coconut',
category: [`cool`, `rad`],
},
},
],
total: 1,
rawResponse: getMockRawResponse([
{
_id: 'my-cool-id',
_index: 'my-cool-index',
_version: 4,
fields: {
product: 'coconut',
category: [`cool`, `rad`],
},
},
},
]),
})
);
@ -667,22 +636,17 @@ describe('fields from job.columns (7.13+ generated)', () => {
it('default column names come from tabify', async () => {
mockDataClient.search = jest.fn().mockImplementation(() =>
Rx.of({
rawResponse: {
hits: {
hits: [
{
_id: 'my-cool-id',
_index: 'my-cool-index',
_version: 4,
fields: {
product: 'coconut',
category: [`cool`, `rad`],
},
},
],
total: 1,
rawResponse: getMockRawResponse([
{
_id: 'my-cool-id',
_index: 'my-cool-index',
_version: 4,
fields: {
product: 'coconut',
category: [`cool`, `rad`],
},
},
},
]),
})
);
@ -714,20 +678,15 @@ describe('formulas', () => {
it(`escapes formula values in a cell, doesn't warn the csv contains formulas`, async () => {
mockDataClient.search = jest.fn().mockImplementation(() =>
Rx.of({
rawResponse: {
hits: {
hits: [
{
fields: {
date: ['2020-12-31T00:14:28.000Z'],
ip: ['110.135.176.89'],
message: [TEST_FORMULA],
},
},
],
total: 1,
},
},
rawResponse: getMockRawResponse([
{
fields: {
date: ['2020-12-31T00:14:28.000Z'],
ip: ['110.135.176.89'],
message: [TEST_FORMULA],
},
} as unknown as estypes.SearchHit,
]),
})
);
@ -757,20 +716,15 @@ describe('formulas', () => {
it(`escapes formula values in a header, doesn't warn the csv contains formulas`, async () => {
mockDataClient.search = jest.fn().mockImplementation(() =>
Rx.of({
rawResponse: {
hits: {
hits: [
{
fields: {
date: ['2020-12-31T00:14:28.000Z'],
ip: ['110.135.176.89'],
[TEST_FORMULA]: 'This is great data',
},
},
],
total: 1,
},
},
rawResponse: getMockRawResponse([
{
fields: {
date: ['2020-12-31T00:14:28.000Z'],
ip: ['110.135.176.89'],
[TEST_FORMULA]: 'This is great data',
},
} as unknown as estypes.SearchHit,
]),
})
);
@ -808,20 +762,15 @@ describe('formulas', () => {
});
mockDataClient.search = jest.fn().mockImplementation(() =>
Rx.of({
rawResponse: {
hits: {
hits: [
{
fields: {
date: ['2020-12-31T00:14:28.000Z'],
ip: ['110.135.176.89'],
message: [TEST_FORMULA],
},
},
],
total: 1,
},
},
rawResponse: getMockRawResponse([
{
fields: {
date: ['2020-12-31T00:14:28.000Z'],
ip: ['110.135.176.89'],
message: [TEST_FORMULA],
},
} as unknown as estypes.SearchHit,
]),
})
);
@ -875,8 +824,6 @@ it('can override ignoring frozen indices', async () => {
params: {
body: {},
ignore_throttled: false,
scroll: '30s',
size: 500,
},
},
{ strategy: 'es', transport: { maxRetries: 0, requestTimeout: '30s' } }
@ -928,7 +875,7 @@ it('will return partial data if the scroll or search fails', async () => {
expect(mockLogger.error.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
"CSV export scan error: ResponseError: my error",
"CSV export search error: ResponseError: my error",
],
Array [
[ResponseError: my error],
@ -978,27 +925,27 @@ it('handles unknown errors', async () => {
describe('error codes', () => {
it('returns the expected error code when authentication expires', async () => {
mockDataClient.search = jest.fn().mockImplementation(() =>
Rx.of({
rawResponse: {
_scroll_id: 'test',
hits: {
hits: range(0, 5).map(() => ({
mockDataClient.search = jest
.fn()
.mockImplementationOnce(() =>
Rx.of({
rawResponse: getMockRawResponse(
range(0, 5).map(() => ({
_index: 'lasdf',
_id: 'lasdf123',
fields: {
date: ['2020-12-31T00:14:28.000Z'],
ip: ['110.135.176.89'],
message: ['super cali fragile istic XPLA docious'],
},
})),
total: 10,
},
},
})
);
mockEsClient.asCurrentUser.scroll = jest.fn().mockImplementation(() => {
throw new esErrors.ResponseError({ statusCode: 403, meta: {} as any, warnings: [] });
});
10
),
})
)
.mockImplementationOnce(() => {
throw new esErrors.ResponseError({ statusCode: 403, meta: {} as any, warnings: [] });
});
const generateCsv = new CsvGenerator(
createMockJob({ columns: ['date', 'ip', 'message'] }),
@ -1029,7 +976,7 @@ describe('error codes', () => {
expect(mockLogger.error.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
"CSV export scroll error: ResponseError: Response Error",
"CSV export search error: ResponseError: Response Error",
],
Array [
[ResponseError: Response Error],

View file

@ -5,15 +5,9 @@
* 2.0.
*/
import { errors as esErrors } from '@elastic/elasticsearch';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { errors as esErrors, estypes } from '@elastic/elasticsearch';
import type { IScopedClusterClient, IUiSettingsClient, Logger } from '@kbn/core/server';
import type {
DataView,
ISearchSource,
ISearchStartSearchSource,
SearchRequest,
} from '@kbn/data-plugin/common';
import type { ISearchSource, ISearchStartSearchSource } from '@kbn/data-plugin/common';
import { cellHasFormulas, ES_SEARCH_STRATEGY, tabifyDocs } from '@kbn/data-plugin/common';
import type { IScopedSearchClient } from '@kbn/data-plugin/server';
import type { Datatable } from '@kbn/expressions-plugin/server';
@ -61,21 +55,63 @@ export class CsvGenerator {
private stream: Writable
) {}
private async scan(index: DataView, searchSource: ISearchSource, settings: CsvExportSettings) {
private async openPointInTime(indexPatternTitle: string, settings: CsvExportSettings) {
const { duration } = settings.scroll;
let pitId: string | undefined;
this.logger.debug(`Requesting point-in-time for: [${indexPatternTitle}]...`);
try {
// NOTE: if ES is overloaded, this request could time out
const response = await this.clients.es.asCurrentUser.openPointInTime(
{
index: indexPatternTitle,
keep_alive: duration,
ignore_unavailable: true,
},
{
requestTimeout: duration,
maxRetries: 0,
}
);
pitId = response.id;
} catch (err) {
this.logger.error(err);
}
if (!pitId) {
throw new Error(`Could not receive a point-in-time ID!`);
}
this.logger.debug(`Opened PIT ID: ${this.truncatePitId(pitId)}`);
return pitId;
}
private async doSearch(
searchSource: ISearchSource,
settings: CsvExportSettings,
searchAfter?: estypes.SortResults
) {
const { scroll: scrollSettings, includeFrozen } = settings;
const searchBody: SearchRequest | undefined = searchSource.getSearchRequestBody();
searchSource.setField('size', scrollSettings.size);
if (searchAfter) {
searchSource.setField('searchAfter', searchAfter);
}
const pitId = searchSource.getField('pit')?.id;
this.logger.debug(
`Executing search request with PIT ID: [${this.truncatePitId(pitId)}]` +
(searchAfter ? ` search_after: [${searchAfter}]` : '')
);
const searchBody: estypes.SearchRequest = searchSource.getSearchRequestBody();
if (searchBody == null) {
throw new Error('Could not retrieve the search body!');
}
this.logger.debug(`Tracking total hits with: track_total_hits=${searchBody.track_total_hits}`);
this.logger.info(`Executing search request...`);
const searchParams = {
params: {
body: searchBody,
index: index.title,
scroll: scrollSettings.duration,
size: scrollSettings.size,
ignore_throttled: includeFrozen ? false : undefined, // "true" will cause deprecation warnings logged in ES
},
};
@ -88,35 +124,19 @@ export class CsvGenerator {
strategy: ES_SEARCH_STRATEGY,
transport: {
maxRetries: 0, // retrying reporting jobs is handled in the task manager scheduling logic
requestTimeout: this.config.scroll.duration,
requestTimeout: scrollSettings.duration,
},
})
)
).rawResponse as estypes.SearchResponse<unknown>;
).rawResponse;
} catch (err) {
this.logger.error(`CSV export scan error: ${err}`);
this.logger.error(`CSV export search error: ${err}`);
throw err;
}
return results;
}
private async scroll(scrollId: string, scrollSettings: CsvExportSettings['scroll']) {
this.logger.info(`Executing scroll request...`);
let results: estypes.SearchResponse<unknown> | undefined;
try {
results = await this.clients.es.asCurrentUser.scroll({
scroll: scrollSettings.duration,
scroll_id: scrollId,
});
} catch (err) {
this.logger.error(`CSV export scroll error: ${err}`);
throw err;
}
return results;
}
/*
* Load field formats for each field in the list
*/
@ -202,7 +222,7 @@ export class CsvGenerator {
builder: MaxSizeStringBuilder,
settings: CsvExportSettings
) {
this.logger.debug(`Building CSV header row...`);
this.logger.debug(`Building CSV header row`);
const header =
Array.from(columns).map(this.escapeValues(settings)).join(settings.separator) + '\n';
@ -225,7 +245,7 @@ export class CsvGenerator {
formatters: Record<string, FieldFormat>,
settings: CsvExportSettings
) {
this.logger.debug(`Building ${table.rows.length} CSV data rows...`);
this.logger.debug(`Building ${table.rows.length} CSV data rows`);
for (const dataTableRow of table.rows) {
if (this.cancellationToken.isCancelled()) {
break;
@ -293,26 +313,28 @@ export class CsvGenerator {
throw new Error(`The search must have a reference to an index pattern!`);
}
const { maxSizeBytes, bom, escapeFormulaValues, scroll: scrollSettings } = settings;
const { maxSizeBytes, bom, escapeFormulaValues, timezone } = settings;
const indexPatternTitle = index.getIndexPattern();
const builder = new MaxSizeStringBuilder(this.stream, byteSizeValueToNumber(maxSizeBytes), bom);
const warnings: string[] = [];
let first = true;
let currentRecord = -1;
let totalRecords: number | undefined;
let totalRelation = 'eq';
let scrollId: string | undefined;
let searchAfter: estypes.SortResults | undefined;
let pitId = await this.openPointInTime(indexPatternTitle, settings);
// apply timezone from the job to all date field formatters
try {
index.fields.getByType('date').forEach(({ name }) => {
this.logger.debug(`setting timezone on ${name}`);
this.logger.debug(`Setting timezone on ${name}`);
const format: FieldFormatConfig = {
...index.fieldFormatMap[name],
id: index.fieldFormatMap[name]?.id || 'date', // allow id: date_nanos
params: {
...index.fieldFormatMap[name]?.params,
timezone: settings.timezone,
timezone,
},
};
index.setFieldFormat(name, format);
@ -327,24 +349,20 @@ export class CsvGenerator {
if (this.cancellationToken.isCancelled()) {
break;
}
let results: estypes.SearchResponse<unknown> | undefined;
if (scrollId == null) {
// open a scroll cursor in Elasticsearch
results = await this.scan(index, searchSource, settings);
scrollId = results?._scroll_id;
if (results?.hits?.total != null) {
const { hits } = results;
if (typeof hits.total === 'number') {
totalRecords = hits.total;
} else {
totalRecords = hits.total?.value;
totalRelation = hits.total?.relation ?? 'unknown';
}
this.logger.info(`Total hits: [${totalRecords}].` + `Accuracy: ${totalRelation}`);
// set the latest pit, which could be different from the last request
searchSource.setField('pit', { id: pitId, keep_alive: settings.scroll.duration });
const results = await this.doSearch(searchSource, settings, searchAfter);
const { hits } = results;
if (first && hits.total != null) {
if (typeof hits.total === 'number') {
totalRecords = hits.total;
} else {
totalRecords = hits.total?.value;
totalRelation = hits.total?.relation ?? 'unknown';
}
} else {
// use the scroll cursor in Elasticsearch
results = await this.scroll(scrollId, scrollSettings);
this.logger.info(`Total hits ${totalRelation} ${totalRecords}.`);
}
if (!results) {
@ -352,13 +370,35 @@ export class CsvGenerator {
break;
}
// TODO check for shard failures, log them and add a warning if found
{
const {
hits: { hits, ...hitsMeta },
...header
} = results;
this.logger.debug('Results metadata: ' + JSON.stringify({ header, hitsMeta }));
const {
hits: { hits: _hits, ...hitsMeta },
...headerWithPit
} = results;
const { pit_id: newPitId, ...header } = headerWithPit;
const logInfo = {
header: { pit_id: `${this.truncatePitId(newPitId)}`, ...header },
hitsMeta,
};
this.logger.debug(`Results metadata: ${JSON.stringify(logInfo)}`);
// use the most recently received id for the next search request
this.logger.debug(`Received PIT ID: [${this.truncatePitId(results.pit_id)}]`);
pitId = results.pit_id ?? pitId;
// Update last sort results for next query. PIT is used, so the sort results
// automatically include _shard_doc as a tiebreaker
searchAfter = hits.hits[hits.hits.length - 1]?.sort as estypes.SortResults | undefined;
this.logger.debug(`Received search_after: [${searchAfter}]`);
// check for shard failures, log them and add a warning if found
const { _shards: shards } = header;
if (shards.failures) {
shards.failures.forEach(({ reason }) => {
warnings.push(`Shard failure: ${JSON.stringify(reason)}`);
this.logger.warn(JSON.stringify(reason));
});
}
let table: Datatable | undefined;
@ -411,16 +451,12 @@ export class CsvGenerator {
warnings.push(i18nTexts.unknownError(err?.message ?? err));
}
} finally {
// clear scrollID
if (scrollId) {
this.logger.debug(`Executing clearScroll request`);
try {
await this.clients.es.asCurrentUser.clearScroll({ scroll_id: [scrollId] });
} catch (err) {
this.logger.error(err);
}
//
if (pitId) {
this.logger.debug(`Closing point-in-time`);
await this.clients.es.asCurrentUser.closePointInTime({ body: { id: pitId } });
} else {
this.logger.warn(`No scrollId to clear!`);
this.logger.warn(`No PIT ID to clear!`);
}
}
@ -429,7 +465,7 @@ export class CsvGenerator {
if (!this.maxSizeReached && this.csvRowCount !== totalRecords) {
this.logger.warn(
`ES scroll returned fewer total hits than expected! ` +
`Search result total hits: ${totalRecords}. Row count: ${this.csvRowCount}.`
`Search result total hits: ${totalRecords}. Row count: ${this.csvRowCount}`
);
warnings.push(
i18nTexts.csvRowCountError({ expected: totalRecords ?? NaN, received: this.csvRowCount })
@ -447,4 +483,8 @@ export class CsvGenerator {
error_code: reportingError?.code,
};
}
private truncatePitId(pitId: string | undefined) {
return pitId?.substring(0, 12) + '...';
}
}