[ML] File data visualizer reduce chunk size for slow processors (#121353)

* [ML] File data visualizer reduce chunk size for slow processors

* adding constant

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
James Gowdy 2022-01-04 15:12:28 +00:00 committed by GitHub
parent a08da2e9ec
commit 078f176c04
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 62 additions and 23 deletions

View file

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const isPopulatedObject = <U extends string = string>(
arg: unknown,
requiredAttributes: U[] = []
): arg is Record<U, unknown> => {
return (
typeof arg === 'object' &&
arg !== null &&
Object.keys(arg).length > 0 &&
(requiredAttributes.length === 0 ||
requiredAttributes.every((d) => ({}.hasOwnProperty.call(arg, d))))
);
};

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { chunk } from 'lodash';
import { chunk, intersection } from 'lodash';
import moment from 'moment';
import { i18n } from '@kbn/i18n';
import { getHttp } from '../kibana_services';
@ -19,14 +19,17 @@ import {
MB,
} from '../../common';
import { CreateDocsResponse, IImporter, ImportResults } from './types';
import { isPopulatedObject } from '../../common/utils';
const CHUNK_SIZE = 5000;
const REDUCED_CHUNK_SIZE = 100;
export const MAX_CHUNK_CHAR_COUNT = 1000000;
export const IMPORT_RETRIES = 5;
const STRING_CHUNKS_MB = 100;
export abstract class Importer implements IImporter {
protected _docArray: ImportDoc[] = [];
private _chunkSize = CHUNK_SIZE;
public read(data: ArrayBuffer) {
const decoder = new TextDecoder();
@ -66,6 +69,12 @@ export abstract class Importer implements IImporter {
) {
updatePipelineTimezone(pipeline);
if (pipelineContainsSpecialProcessors(pipeline)) {
// pipeline contains processors which we know are slow
// so reduce the chunk size significantly to avoid timeouts
this._chunkSize = REDUCED_CHUNK_SIZE;
}
// if no pipeline has been supplied,
// send an empty object
const ingestPipeline =
@ -101,7 +110,7 @@ export abstract class Importer implements IImporter {
};
}
const chunks = createDocumentChunks(this._docArray);
const chunks = createDocumentChunks(this._docArray, this._chunkSize);
const ingestPipeline = {
id: pipelineId,
@ -153,11 +162,11 @@ export abstract class Importer implements IImporter {
console.error(resp);
success = false;
error = resp.error;
populateFailures(resp, failures, i);
populateFailures(resp, failures, i, this._chunkSize);
break;
}
populateFailures(resp, failures, i);
populateFailures(resp, failures, i, this._chunkSize);
}
const result: ImportResults = {
@ -176,13 +185,18 @@ export abstract class Importer implements IImporter {
}
}
function populateFailures(error: ImportResponse, failures: ImportFailure[], chunkCount: number) {
function populateFailures(
error: ImportResponse,
failures: ImportFailure[],
chunkCount: number,
chunkSize: number
) {
if (error.failures && error.failures.length) {
// update the item value to include the chunk count
// e.g. item 3 in chunk 2 is actually item 20003
for (let f = 0; f < error.failures.length; f++) {
const failure = error.failures[f];
failure.item = failure.item + CHUNK_SIZE * chunkCount;
failure.item = failure.item + chunkSize * chunkCount;
}
failures.push(...error.failures);
}
@ -207,10 +221,10 @@ function updatePipelineTimezone(ingestPipeline: IngestPipeline) {
}
}
function createDocumentChunks(docArray: ImportDoc[]) {
function createDocumentChunks(docArray: ImportDoc[], chunkSize: number) {
const chunks: ImportDoc[][] = [];
// chop docArray into 5000 doc chunks
const tempChunks = chunk(docArray, CHUNK_SIZE);
// chop docArray into chunks
const tempChunks = chunk(docArray, chunkSize);
// loop over tempChunks and check that the total character length
// for each chunk is within the MAX_CHUNK_CHAR_COUNT.
@ -236,6 +250,24 @@ function createDocumentChunks(docArray: ImportDoc[]) {
return chunks;
}
function pipelineContainsSpecialProcessors(pipeline: IngestPipeline) {
const findKeys = (obj: object) => {
// return all nested keys in the pipeline
const keys: string[] = [];
Object.entries(obj).forEach(([key, val]) => {
keys.push(key);
if (isPopulatedObject(val)) {
keys.push(...findKeys(val));
}
});
return keys;
};
const keys = findKeys(pipeline);
const specialProcessors = ['inference', 'enrich'];
return intersection(specialProcessors, keys).length !== 0;
}
export function callImportRoute({
id,
index,

View file

@ -6,7 +6,7 @@
*/
import { IScopedClusterClient } from 'kibana/server';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { isPopulatedObject } from './utils/runtime_field_utils';
import { isPopulatedObject } from '../common/utils';
export async function getTimeFieldRange(
client: IScopedClusterClient,

View file

@ -7,22 +7,10 @@
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { RUNTIME_FIELD_TYPES } from '../../../../../src/plugins/data/common';
import { isPopulatedObject } from '../../common/utils';
type RuntimeType = typeof RUNTIME_FIELD_TYPES[number];
export const isPopulatedObject = <U extends string = string>(
arg: unknown,
requiredAttributes: U[] = []
): arg is Record<U, unknown> => {
return (
typeof arg === 'object' &&
arg !== null &&
Object.keys(arg).length > 0 &&
(requiredAttributes.length === 0 ||
requiredAttributes.every((d) => ({}.hasOwnProperty.call(arg, d))))
);
};
export function isRuntimeField(arg: unknown): arg is estypes.MappingRuntimeField {
return (
((isPopulatedObject(arg, ['type']) && Object.keys(arg).length === 1) ||