[Maps] chunk geojson upload to keep import requests under 1MB (#93678)

* [Maps] chunk geojson upload to keep import requests under 1MB

* fix geojson_importer tests

* update failure.item to reflect location in file

* remove console statement

* clean up

* return instead of break if upload is no longer active

* add unit test for createChunks

* update file_upload API

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Nathan Reese 2021-03-08 10:25:59 -07:00 committed by GitHub
parent 2b3bac95c1
commit 4dd0a7e0f7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 281 additions and 128 deletions

View file

@ -19,7 +19,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 24
"lineNumber": 31
}
},
{
@ -30,7 +30,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 25
"lineNumber": 32
}
},
{
@ -41,7 +41,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 26
"lineNumber": 33
},
"signature": [
{
@ -62,7 +62,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 27
"lineNumber": 34
},
"signature": [
"any"
@ -71,7 +71,7 @@
],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 23
"lineNumber": 30
},
"initialIsOpen": false
},
@ -237,7 +237,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 37
"lineNumber": 44
}
}
],
@ -245,7 +245,7 @@
"returnComment": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 37
"lineNumber": 44
}
},
{
@ -299,7 +299,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 39
"lineNumber": 46
}
},
{
@ -318,7 +318,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 40
"lineNumber": 47
}
},
{
@ -337,7 +337,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 41
"lineNumber": 48
}
},
{
@ -356,7 +356,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 42
"lineNumber": 49
}
}
],
@ -364,7 +364,7 @@
"returnComment": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 38
"lineNumber": 45
}
},
{
@ -394,7 +394,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 45
"lineNumber": 52
}
},
{
@ -407,7 +407,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 46
"lineNumber": 53
}
},
{
@ -420,7 +420,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 47
"lineNumber": 54
}
},
{
@ -433,7 +433,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 48
"lineNumber": 55
}
}
],
@ -441,13 +441,13 @@
"returnComment": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 44
"lineNumber": 51
}
}
],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 36
"lineNumber": 43
},
"initialIsOpen": false
},
@ -466,7 +466,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 11
"lineNumber": 18
},
"signature": [
{
@ -486,7 +486,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 12
"lineNumber": 19
},
"signature": [
{
@ -506,7 +506,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 13
"lineNumber": 20
},
"signature": [
{
@ -521,7 +521,7 @@
],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 10
"lineNumber": 17
},
"initialIsOpen": false
},
@ -540,7 +540,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 31
"lineNumber": 38
},
"signature": [
"string | undefined"
@ -554,7 +554,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 32
"lineNumber": 39
},
"signature": [
"string | undefined"
@ -568,7 +568,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 33
"lineNumber": 40
},
"signature": [
{
@ -583,7 +583,7 @@
],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 30
"lineNumber": 37
},
"initialIsOpen": false
},
@ -782,7 +782,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 17
"lineNumber": 24
}
},
{
@ -793,10 +793,17 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 18
"lineNumber": 25
},
"signature": [
"any[] | undefined"
{
"pluginId": "fileUpload",
"scope": "common",
"docId": "kibFileUploadPluginApi",
"section": "def-common.ImportFailure",
"text": "ImportFailure"
},
"[] | undefined"
]
},
{
@ -807,7 +814,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 19
"lineNumber": 26
},
"signature": [
"number | undefined"
@ -821,7 +828,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 20
"lineNumber": 27
},
"signature": [
"any"
@ -830,7 +837,7 @@
],
"source": {
"path": "x-pack/plugins/file_upload/public/importer/types.ts",
"lineNumber": 16
"lineNumber": 23
},
"initialIsOpen": false
},

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { GeoJsonImporter, toEsDocs } from './geojson_importer';
import { GeoJsonImporter, createChunks, toEsDoc } from './geojson_importer';
import { ES_FIELD_TYPES } from '../../../../../../src/plugins/data/public';
import '@loaders.gl/polyfills';
@ -221,52 +221,79 @@ describe('previewFile', () => {
});
});
describe('toEsDocs', () => {
test('should convert features to geo_point ES documents', () => {
const esDocs = toEsDocs(FEATURE_COLLECTION.features, ES_FIELD_TYPES.GEO_POINT);
expect(esDocs).toEqual([
{
describe('toEsDoc', () => {
test('should convert feature to geo_point ES document', () => {
const esDoc = toEsDoc(FEATURE_COLLECTION.features[0], ES_FIELD_TYPES.GEO_POINT);
expect(esDoc).toEqual({
coordinates: [-112.0372, 46.608058],
population: 200,
});
});
test('should convert feature to geo_shape ES document', () => {
const esDoc = toEsDoc(FEATURE_COLLECTION.features[0], ES_FIELD_TYPES.GEO_SHAPE);
expect(esDoc).toEqual({
coordinates: {
type: 'Point',
coordinates: [-112.0372, 46.608058],
population: 200,
},
]);
population: 200,
});
});
test('should convert features to geo_shape ES documents', () => {
const esDocs = toEsDocs(FEATURE_COLLECTION.features, ES_FIELD_TYPES.GEO_SHAPE);
expect(esDocs).toEqual([
{
coordinates: {
type: 'Point',
coordinates: [-112.0372, 46.608058],
},
population: 200,
test('should convert GeometryCollection feature to geo_shape ES document', () => {
const esDoc = toEsDoc(GEOMETRY_COLLECTION_FEATURE, ES_FIELD_TYPES.GEO_SHAPE);
expect(esDoc).toEqual({
coordinates: {
type: 'GeometryCollection',
geometries: [
{
type: 'Point',
coordinates: [100.0, 0.0],
},
{
type: 'LineString',
coordinates: [
[101.0, 0.0],
[102.0, 1.0],
],
},
],
},
]);
});
test('should convert GeometryCollection feature to geo_shape ES documents', () => {
const esDocs = toEsDocs([GEOMETRY_COLLECTION_FEATURE], ES_FIELD_TYPES.GEO_SHAPE);
expect(esDocs).toEqual([
{
coordinates: {
type: 'GeometryCollection',
geometries: [
{
type: 'Point',
coordinates: [100.0, 0.0],
},
{
type: 'LineString',
coordinates: [
[101.0, 0.0],
[102.0, 1.0],
],
},
],
},
population: 200,
},
]);
population: 200,
});
});
});
describe('createChunks', () => {
const GEOMETRY_COLLECTION_DOC_CHARS = JSON.stringify(
toEsDoc(GEOMETRY_COLLECTION_FEATURE, ES_FIELD_TYPES.GEO_SHAPE)
).length;
const features = [
GEOMETRY_COLLECTION_FEATURE,
GEOMETRY_COLLECTION_FEATURE,
GEOMETRY_COLLECTION_FEATURE,
GEOMETRY_COLLECTION_FEATURE,
GEOMETRY_COLLECTION_FEATURE,
];
test('should break features into chunks', () => {
const maxChunkCharCount = GEOMETRY_COLLECTION_DOC_CHARS * 3.5;
const chunks = createChunks(features, ES_FIELD_TYPES.GEO_SHAPE, maxChunkCharCount);
expect(chunks.length).toBe(2);
expect(chunks[0].length).toBe(3);
expect(chunks[1].length).toBe(2);
});
test('should break features into chunks containing only single feature when feature size is greater then maxChunkCharCount', () => {
const maxChunkCharCount = GEOMETRY_COLLECTION_DOC_CHARS * 0.8;
const chunks = createChunks(features, ES_FIELD_TYPES.GEO_SHAPE, maxChunkCharCount);
expect(chunks.length).toBe(5);
expect(chunks[0].length).toBe(1);
expect(chunks[1].length).toBe(1);
expect(chunks[2].length).toBe(1);
expect(chunks[3].length).toBe(1);
expect(chunks[4].length).toBe(1);
});
});

View file

@ -10,13 +10,13 @@ import { i18n } from '@kbn/i18n';
// @ts-expect-error
import { JSONLoader, loadInBatches } from './loaders';
import { CreateDocsResponse, ImportResults } from '../types';
import { callImportRoute, Importer, IMPORT_RETRIES } from '../importer';
import { callImportRoute, Importer, IMPORT_RETRIES, MAX_CHUNK_CHAR_COUNT } from '../importer';
import { ES_FIELD_TYPES } from '../../../../../../src/plugins/data/public';
// @ts-expect-error
import { geoJsonCleanAndValidate } from './geojson_clean_and_validate';
import { ImportFailure, ImportResponse, MB } from '../../../common';
import { ImportDoc, ImportFailure, ImportResponse, MB } from '../../../common';
const IMPORT_CHUNK_SIZE_MB = 10 * MB;
const BLOCK_SIZE_MB = 5 * MB;
export const GEOJSON_FILE_TYPES = ['.json', '.geojson'];
export interface GeoJsonPreview {
@ -32,9 +32,11 @@ export class GeoJsonImporter extends Importer {
private _iterator?: Iterator<unknown>;
private _hasNext = true;
private _features: Feature[] = [];
private _totalBytesProcessed = 0;
private _unimportedBytesProcessed = 0;
private _totalFeatures = 0;
private _totalBytesRead = 0;
private _totalBytesImported = 0;
private _blockSizeInBytes = 0;
private _totalFeaturesRead = 0;
private _totalFeaturesImported = 0;
private _geometryTypesMap = new Map<string, boolean>();
private _invalidFeatures: ImportFailure[] = [];
private _prevBatchLastFeature?: Feature;
@ -56,7 +58,7 @@ export class GeoJsonImporter extends Importer {
return {
features: [...this._features],
previewCoverage: this._hasNext
? Math.round((this._unimportedBytesProcessed / this._file.size) * 100)
? Math.round((this._blockSizeInBytes / this._file.size) * 100)
: 100,
hasPoints: this._geometryTypesMap.has('Point') || this._geometryTypesMap.has('MultiPoint'),
hasShapes:
@ -90,17 +92,87 @@ export class GeoJsonImporter extends Importer {
let success = true;
const failures: ImportFailure[] = [...this._invalidFeatures];
let error;
let importBlockPromise: Promise<ImportResults> | undefined;
// Read file in blocks to avoid loading too much of file into memory at a time
while ((this._features.length > 0 || this._hasNext) && this._isActive) {
await this._readUntil(undefined, IMPORT_CHUNK_SIZE_MB);
await this._readUntil(undefined, BLOCK_SIZE_MB);
if (!this._isActive) {
return {
success: false,
failures,
docCount: this._totalFeatures,
};
}
// wait for previous import call to finish before starting next import
if (importBlockPromise !== undefined) {
const importBlockResults = await importBlockPromise;
importBlockPromise = undefined;
if (importBlockResults.failures) {
failures.push(...importBlockResults.failures);
}
if (!importBlockResults.success) {
success = false;
error = importBlockResults.error;
break;
}
}
// Import block in chunks to avoid sending too much data to Elasticsearch at a time.
const chunks = createChunks(this._features, this._geoFieldType, MAX_CHUNK_CHAR_COUNT);
const blockSizeInBytes = this._blockSizeInBytes;
// reset block for next read
this._features = [];
this._blockSizeInBytes = 0;
importBlockPromise = this._importBlock(
id,
index,
pipelineId,
chunks,
blockSizeInBytes,
setImportProgress
);
}
// wait for last import call
if (importBlockPromise) {
const importBlockResults = await importBlockPromise;
if (importBlockResults.failures) {
failures.push(...importBlockResults.failures);
}
if (!importBlockResults.success) {
success = false;
error = importBlockResults.error;
}
}
setImportProgress(100);
return {
success,
failures,
docCount: this._totalFeaturesRead,
error,
};
}
private async _importBlock(
id: string,
index: string,
pipelineId: string,
chunks: ImportDoc[][],
blockSizeInBytes: number,
setImportProgress: (progress: number) => void
): Promise<ImportResults> {
let success = true;
const failures: ImportFailure[] = [];
let error;
for (let i = 0; i < chunks.length; i++) {
let retries = IMPORT_RETRIES;
let resp: ImportResponse = {
success: false,
@ -110,17 +182,12 @@ export class GeoJsonImporter extends Importer {
index: '',
pipelineId: '',
};
const data = toEsDocs(this._features, this._geoFieldType);
const progress = Math.round((this._totalBytesProcessed / this._file.size) * 100);
this._features = [];
this._unimportedBytesProcessed = 0;
while (resp.success === false && retries > 0) {
try {
resp = await callImportRoute({
id,
index,
data,
data: chunks[i],
settings: {},
mappings: {},
ingestPipeline: {
@ -128,6 +195,13 @@ export class GeoJsonImporter extends Importer {
},
});
if (!this._isActive) {
return {
success: false,
failures,
};
}
if (retries < IMPORT_RETRIES) {
// eslint-disable-next-line no-console
console.log(`Retrying import ${IMPORT_RETRIES - retries}`);
@ -141,30 +215,42 @@ export class GeoJsonImporter extends Importer {
}
}
failures.push(...resp.failures);
if (resp.failures && resp.failures.length) {
// failure.item is the document position in the chunk passed to import endpoint.
// Need to update failure.item to reflect the actual feature position in the file.
// e.g. item 3 in chunk is actually item 20003
for (let f = 0; f < resp.failures.length; f++) {
const failure = resp.failures[f];
failure.item += this._totalFeaturesImported;
}
failures.push(...resp.failures);
}
if (!resp.success) {
if (resp.success) {
this._totalFeaturesImported += chunks[i].length;
// Advance block percentage in equal increments
// even though chunks are not identical in size.
// Reason being that chunk size does not exactly correlate to bytes read from file
// because features are converted to elasticsearch documents which changes the size.
const chunkProgress = (i + 1) / chunks.length;
const totalBytesImported = this._totalBytesImported + blockSizeInBytes * chunkProgress;
const progressPercent = (totalBytesImported / this._file.size) * 100;
setImportProgress(Math.round(progressPercent * 10) / 10);
} else {
success = false;
error = resp.error;
break;
}
setImportProgress(progress);
}
const result: ImportResults = {
this._totalBytesImported += blockSizeInBytes;
return {
success,
failures,
docCount: this._totalFeatures,
error,
};
if (success) {
setImportProgress(100);
} else {
result.error = error;
}
return result;
}
private async _readUntil(rowLimit?: number, sizeLimit?: number) {
@ -172,7 +258,7 @@ export class GeoJsonImporter extends Importer {
this._isActive &&
this._hasNext &&
(rowLimit === undefined || this._features.length < rowLimit) &&
(sizeLimit === undefined || this._unimportedBytesProcessed < sizeLimit)
(sizeLimit === undefined || this._blockSizeInBytes < sizeLimit)
) {
await this._next();
}
@ -200,9 +286,9 @@ export class GeoJsonImporter extends Importer {
}
if ('bytesUsed' in batch) {
const bytesRead = batch.bytesUsed - this._totalBytesProcessed;
this._unimportedBytesProcessed += bytesRead;
this._totalBytesProcessed = batch.bytesUsed;
const bytesRead = batch.bytesUsed - this._totalBytesRead;
this._blockSizeInBytes += bytesRead;
this._totalBytesRead = batch.bytesUsed;
}
const rawFeatures: unknown[] = this._prevBatchLastFeature ? [this._prevBatchLastFeature] : [];
@ -210,7 +296,7 @@ export class GeoJsonImporter extends Importer {
const isLastBatch = batch.batchType === 'root-object-batch-complete';
if (isLastBatch) {
// Handle single feature geoJson
if (this._totalFeatures === 0) {
if (this._totalFeaturesRead === 0) {
rawFeatures.push(batch.container);
}
} else {
@ -225,10 +311,10 @@ export class GeoJsonImporter extends Importer {
continue;
}
this._totalFeatures++;
this._totalFeaturesRead++;
if (!rawFeature.geometry || !rawFeature.geometry.type) {
this._invalidFeatures.push({
item: this._totalFeatures,
item: this._totalFeaturesRead,
reason: i18n.translate('xpack.fileUpload.geojsonImporter.noGeometry', {
defaultMessage: 'Feature does not contain required field "geometry"',
}),
@ -252,21 +338,47 @@ export class GeoJsonImporter extends Importer {
}
}
export function toEsDocs(
export function createChunks(
features: Feature[],
geoFieldType: ES_FIELD_TYPES.GEO_POINT | ES_FIELD_TYPES.GEO_SHAPE,
maxChunkCharCount: number
): ImportDoc[][] {
const chunks: ImportDoc[][] = [];
let chunk: ImportDoc[] = [];
let chunkChars = 0;
for (let i = 0; i < features.length; i++) {
const doc = toEsDoc(features[i], geoFieldType);
const docChars = JSON.stringify(doc).length + 1; // +1 adds CHAR for comma once document is in list
if (chunk.length === 0 || chunkChars + docChars < maxChunkCharCount) {
// add ES document to current chunk
chunk.push(doc);
chunkChars += docChars;
} else {
// chunk boundary found, start new chunk
chunks.push(chunk);
chunk = [doc];
chunkChars = docChars;
}
}
if (chunk.length) {
chunks.push(chunk);
}
return chunks;
}
export function toEsDoc(
feature: Feature,
geoFieldType: ES_FIELD_TYPES.GEO_POINT | ES_FIELD_TYPES.GEO_SHAPE
) {
const esDocs = [];
for (let i = 0; i < features.length; i++) {
const feature = features[i];
const properties = feature.properties ? feature.properties : {};
esDocs.push({
coordinates:
geoFieldType === ES_FIELD_TYPES.GEO_SHAPE
? feature.geometry
: (feature.geometry as Point).coordinates,
...properties,
});
}
return esDocs;
const properties = feature.properties ? feature.properties : {};
return {
coordinates:
geoFieldType === ES_FIELD_TYPES.GEO_SHAPE
? feature.geometry
: (feature.geometry as Point).coordinates,
...properties,
};
}

View file

@ -21,7 +21,7 @@ import {
import { CreateDocsResponse, IImporter, ImportResults } from './types';
const CHUNK_SIZE = 5000;
const MAX_CHUNK_CHAR_COUNT = 1000000;
export const MAX_CHUNK_CHAR_COUNT = 1000000;
export const IMPORT_RETRIES = 5;
const STRING_CHUNKS_MB = 100;

View file

@ -5,7 +5,14 @@
* 2.0.
*/
import { IngestPipeline, ImportDoc, ImportResponse, Mappings, Settings } from '../../common';
import {
ImportFailure,
IngestPipeline,
ImportDoc,
ImportResponse,
Mappings,
Settings,
} from '../../common';
export interface ImportConfig {
settings: Settings;
@ -15,7 +22,7 @@ export interface ImportConfig {
export interface ImportResults {
success: boolean;
failures?: any[];
failures?: ImportFailure[];
docCount?: number;
error?: any;
}