[ML] File upload api refactor (#210865)

Adds a v2 version of the file upload api which spits away the upload
initialisation step from the data upload api.
Previously the import data API would behave differently depending on
whether an ID was passed to it. If an ID was not present, the api would
"initialize the upload" by creating the index, mappings and pipeline.
Subsequent calls to the api would the pass in an ID as well as the data.
The ID being present meant the data would be ingested.
The ID had not other purpose other than signifying whether this was the
initial call to create the index or the subsequent calls to ingest the
data.
This change adds a new `initialize_import` api which is called first to
create the index et al.
Subsequent calls to the `import` api behave as before and the data is
ingested.

A temporary v1 version of the `import` has been kept for backwards
compatibility during upgrades.

The `initialize_import` also creates multiple ingest pipelines by
default. Improving the previous "hacked in" addition of having two sets
of pipelines passed to it to provide backwards compatibility.
This commit is contained in:
James Gowdy 2025-02-26 10:39:30 +00:00 committed by GitHub
parent 14e7f6007e
commit 0121f4b87b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 386 additions and 282 deletions

View file

@ -84,7 +84,7 @@ export async function importData(props: Props, config: Config, setState: (state:
let settings = {};
let mappings = {};
let pipeline;
const pipelines = [];
try {
settings = JSON.parse(indexSettingsString);
@ -108,7 +108,7 @@ export async function importData(props: Props, config: Config, setState: (state:
try {
if (createPipeline) {
pipeline = JSON.parse(pipelineString) as IngestPipeline;
pipelines.push(JSON.parse(pipelineString) as IngestPipeline);
}
} catch (error) {
success = false;
@ -178,7 +178,12 @@ export async function importData(props: Props, config: Config, setState: (state:
return;
}
const initializeImportResp = await importer.initializeImport(index, settings, mappings, pipeline);
const initializeImportResp = await importer.initializeImport(
index,
settings,
mappings,
pipelines
);
if (initializeImportResp.success === false) {
errors.push(initializeImportResp.error);
@ -198,16 +203,16 @@ export async function importData(props: Props, config: Config, setState: (state:
});
if (createPipeline) {
const pipelineCreated = initializeImportResp.pipelineId !== undefined;
const pipelinesCreated = initializeImportResp.pipelineIds.length > 0;
if (indexCreated) {
setState({
ingestPipelineCreatedStatus: pipelineCreated
ingestPipelineCreatedStatus: pipelinesCreated
? IMPORT_STATUS.COMPLETE
: IMPORT_STATUS.FAILED,
pipelineId: pipelineCreated ? initializeImportResp.pipelineId : '',
pipelineId: pipelinesCreated ? initializeImportResp.pipelineIds[0] : '',
});
}
success = indexCreated && pipelineCreated;
success = indexCreated && pipelinesCreated;
} else {
success = indexCreated;
}
@ -218,9 +223,8 @@ export async function importData(props: Props, config: Config, setState: (state:
}
const importResp = await importer.import(
initializeImportResp.id,
index,
pipelineId ?? initializeImportResp.pipelineId,
pipelineId ?? initializeImportResp.pipelineIds[0],
(progress: number) => {
setState({
uploadProgress: progress,

View file

@ -13,7 +13,10 @@ import { switchMap, combineLatest, BehaviorSubject, of } from 'rxjs';
import type { HttpSetup } from '@kbn/core/public';
import type { IImporter } from '@kbn/file-upload-plugin/public/importer/types';
import type { DataViewsServicePublic } from '@kbn/data-views-plugin/public/types';
import type { ImportResponse, IngestPipeline } from '@kbn/file-upload-plugin/common/types';
import type {
IngestPipeline,
InitializeImportResponse,
} from '@kbn/file-upload-plugin/common/types';
import type {
IndicesIndexSettings,
MappingTypeMapping,
@ -64,7 +67,7 @@ export class FileManager {
private mappingsCheckSubscription: Subscription;
private settings;
private mappings: MappingTypeMapping | null = null;
private pipelines: IngestPipeline[] | null = null;
private pipelines: Array<IngestPipeline | undefined> = [];
private inferenceId: string | null = null;
private importer: IImporter | null = null;
private timeFieldName: string | undefined | null = null;
@ -221,7 +224,7 @@ export class FileManager {
return createMergedMappings(files);
}
private getPipelines(): IngestPipeline[] {
private getPipelines(): Array<IngestPipeline | undefined> {
const files = this.getFiles();
return files.map((file) => file.getPipeline());
}
@ -256,29 +259,32 @@ export class FileManager {
});
}
const createPipelines = this.pipelines.length > 0;
this.setStatus({
indexCreated: STATUS.STARTED,
pipelineCreated: STATUS.STARTED,
pipelineCreated: createPipelines ? STATUS.STARTED : STATUS.NA,
});
let indexCreated = false;
let pipelineCreated = false;
let initializeImportResp: ImportResponse | undefined;
let pipelinesCreated = false;
let initializeImportResp: InitializeImportResponse | undefined;
try {
initializeImportResp = await this.importer.initializeImport(
indexName,
this.settings,
this.mappings,
this.pipelines[0],
this.pipelines
);
this.timeFieldName = this.importer.getTimeField();
indexCreated = initializeImportResp.index !== undefined;
pipelineCreated = initializeImportResp.pipelineId !== undefined;
pipelinesCreated = initializeImportResp.pipelineIds.length > 0;
this.setStatus({
indexCreated: indexCreated ? STATUS.COMPLETED : STATUS.FAILED,
pipelineCreated: pipelineCreated ? STATUS.COMPLETED : STATUS.FAILED,
...(createPipelines
? { pipelineCreated: pipelinesCreated ? STATUS.COMPLETED : STATUS.FAILED }
: {}),
});
if (initializeImportResp.error) {
@ -299,7 +305,11 @@ export class FileManager {
return null;
}
if (!indexCreated || !pipelineCreated || !initializeImportResp) {
if (
indexCreated === false ||
(createPipelines && pipelinesCreated === false) ||
!initializeImportResp
) {
return null;
}
@ -309,16 +319,12 @@ export class FileManager {
// import data
const files = this.getFiles();
const createdPipelineIds = initializeImportResp.pipelineIds;
try {
await Promise.all(
files.map(async (file, i) => {
await file.import(
initializeImportResp!.id,
indexName,
this.mappings!,
`${indexName}-${i}-pipeline`
);
await file.import(indexName, this.mappings!, createdPipelineIds[i] ?? undefined);
})
);
} catch (error) {
@ -345,9 +351,7 @@ export class FileManager {
this.setStatus({
pipelinesDeleted: STATUS.STARTED,
});
await this.importer.deletePipelines(
this.pipelines.map((p, i) => `${indexName}-${i}-pipeline`)
);
await this.importer.deletePipelines();
this.setStatus({
pipelinesDeleted: STATUS.COMPLETED,
});
@ -461,6 +465,9 @@ export class FileManager {
};
this.pipelines.forEach((pipeline) => {
if (pipeline === undefined) {
return;
}
pipeline.processors.push({
set: {
field: 'content',

View file

@ -169,13 +169,8 @@ export class FileWrapper {
public getMappings() {
return this.analyzedFile$.getValue().results?.mappings;
}
public getPipeline(): IngestPipeline {
return (
this.analyzedFile$.getValue().results?.ingest_pipeline ?? {
description: '',
processors: [],
}
);
public getPipeline(): IngestPipeline | undefined {
return this.analyzedFile$.getValue().results?.ingest_pipeline;
}
public getFormat() {
return this.analyzedFile$.getValue().results?.format;
@ -184,7 +179,7 @@ export class FileWrapper {
return this.analyzedFile$.getValue().data;
}
public async import(id: string, index: string, mappings: MappingTypeMapping, pipelineId: string) {
public async import(index: string, mappings: MappingTypeMapping, pipelineId: string | undefined) {
this.setStatus({ importStatus: STATUS.STARTED });
const format = this.analyzedFile$.getValue().results!.format;
const importer = await this.fileUpload.importerFactory(format, {
@ -192,7 +187,8 @@ export class FileWrapper {
multilineStartPattern: this.analyzedFile$.getValue().results!.multiline_start_pattern,
});
importer.initializeWithoutCreate(index, mappings, this.getPipeline());
const ingestPipeline = this.getPipeline();
importer.initializeWithoutCreate(index, mappings, ingestPipeline ? [ingestPipeline] : []);
const data = this.getData();
if (data === null) {
this.setStatus({ importStatus: STATUS.FAILED });
@ -200,7 +196,7 @@ export class FileWrapper {
}
importer.read(data);
try {
const resp = await importer.import(id, index, pipelineId, (p) => {
const resp = await importer.import(index, pipelineId, (p) => {
this.setStatus({ importProgress: p });
});
this.setStatus({ docCount: resp.docCount, importStatus: STATUS.COMPLETED });

View file

@ -98,10 +98,19 @@ export interface HasImportPermission {
export type InputData = any[];
export interface ImportResponse {
export interface InitializeImportResponse {
success: boolean;
id: string;
index?: string;
index: string;
pipelineIds: Array<string | undefined>;
error?: {
error: estypes.ErrorCause;
};
}
export interface ImportResponse {
success: boolean;
index: string;
pipelineId?: string;
docCount: number;
failures: ImportFailure[];

View file

@ -109,10 +109,7 @@ export class GeoUploadWizard extends Component<FileUploadComponentProps, State>
},
},
};
const ingestPipeline = {
description: '',
processors: [],
};
this.setState({
importStatus: i18n.translate('xpack.fileUpload.geoUploadWizard.dataIndexingStarted', {
defaultMessage: 'Creating index: {indexName}',
@ -125,7 +122,7 @@ export class GeoUploadWizard extends Component<FileUploadComponentProps, State>
this.state.indexName,
{},
mappings,
ingestPipeline
[]
);
if (!this._isMounted) {
return;
@ -147,9 +144,8 @@ export class GeoUploadWizard extends Component<FileUploadComponentProps, State>
});
this._geoFileImporter.setSmallChunks(this.state.smallChunks);
const importResults = await this._geoFileImporter.import(
initializeImportResp.id,
this.state.indexName,
initializeImportResp.pipelineId,
initializeImportResp.pipelineIds[0],
(progress) => {
if (this._isMounted) {
this.setState({

View file

@ -11,11 +11,12 @@ import { i18n } from '@kbn/i18n';
import { ES_FIELD_TYPES } from '@kbn/data-plugin/public';
import { GeoFileImporter, GeoFilePreview } from './types';
import { CreateDocsResponse, ImportResults } from '../types';
import { callImportRoute, Importer, IMPORT_RETRIES, MAX_CHUNK_CHAR_COUNT } from '../importer';
import { Importer, IMPORT_RETRIES, MAX_CHUNK_CHAR_COUNT } from '../importer';
import { MB } from '../../../common/constants';
import type { ImportDoc, ImportFailure, ImportResponse } from '../../../common/types';
import { geoJsonCleanAndValidate } from './geojson_clean_and_validate';
import { createChunks } from './create_chunks';
import { callImportRoute } from '../routes';
const BLOCK_SIZE_MB = 5 * MB;
@ -80,16 +81,15 @@ export class AbstractGeoFileImporter extends Importer implements GeoFileImporter
}
public async import(
id: string,
index: string,
pipelineId: string | undefined,
pipelineId: string,
setImportProgress: (progress: number) => void
): Promise<ImportResults> {
if (!id || !index) {
if (!index) {
return {
success: false,
error: i18n.translate('xpack.fileUpload.import.noIdOrIndexSuppliedErrorMessage', {
defaultMessage: 'no ID or index supplied',
error: i18n.translate('xpack.fileUpload.import.noIndexSuppliedErrorMessage', {
defaultMessage: 'No index supplied',
}),
};
}
@ -134,7 +134,6 @@ export class AbstractGeoFileImporter extends Importer implements GeoFileImporter
this._blockSizeInBytes = 0;
importBlockPromise = this._importBlock(
id,
index,
pipelineId,
chunks,
@ -167,9 +166,8 @@ export class AbstractGeoFileImporter extends Importer implements GeoFileImporter
}
private async _importBlock(
id: string,
index: string,
pipelineId: string | undefined,
pipelineId: string,
chunks: ImportDoc[][],
blockSizeInBytes: number,
setImportProgress: (progress: number) => void
@ -184,24 +182,15 @@ export class AbstractGeoFileImporter extends Importer implements GeoFileImporter
success: false,
failures: [],
docCount: 0,
id: '',
index: '',
pipelineId: '',
};
while (resp.success === false && retries > 0) {
try {
resp = await callImportRoute({
id,
index,
ingestPipelineId: pipelineId,
data: chunks[i],
settings: {},
mappings: {},
ingestPipeline:
pipelineId !== undefined
? {
id: pipelineId,
}
: undefined,
});
if (!this._isActive) {

View file

@ -24,6 +24,7 @@ import type {
IngestPipelineWrapper,
} from '../../common/types';
import { CreateDocsResponse, IImporter, ImportResults } from './types';
import { callImportRoute, callInitializeImportRoute } from './routes';
const CHUNK_SIZE = 5000;
const REDUCED_CHUNK_SIZE = 100;
@ -36,7 +37,7 @@ export abstract class Importer implements IImporter {
protected _docArray: ImportDoc[] = [];
protected _chunkSize = CHUNK_SIZE;
private _index: string | undefined;
private _pipeline: IngestPipeline | undefined;
private _pipelines: IngestPipelineWrapper[] = [];
private _timeFieldName: string | undefined;
private _initialized = false;
@ -82,42 +83,30 @@ export abstract class Importer implements IImporter {
protected abstract _createDocs(t: string, isLastPart: boolean): CreateDocsResponse<ImportDoc>;
public async initializeImport(
private _initialize(
index: string,
settings: IndicesIndexSettings,
mappings: MappingTypeMapping,
pipeline: IngestPipeline | undefined,
createPipelines?: IngestPipeline[]
pipelines: Array<IngestPipeline | undefined>
) {
let ingestPipelineWrapper: IngestPipelineWrapper | undefined;
if (pipeline !== undefined) {
updatePipelineTimezone(pipeline);
for (let i = 0; i < pipelines.length; i++) {
const pipeline = pipelines[i];
if (pipeline !== undefined) {
updatePipelineTimezone(pipeline);
if (pipelineContainsSpecialProcessors(pipeline)) {
// pipeline contains processors which we know are slow
// so reduce the chunk size significantly to avoid timeouts
this._chunkSize = REDUCED_CHUNK_SIZE;
if (pipelineContainsSpecialProcessors(pipeline)) {
// pipeline contains processors which we know are slow
// so reduce the chunk size significantly to avoid timeouts
this._chunkSize = REDUCED_CHUNK_SIZE;
}
}
// if no pipeline has been supplied,
// send an empty object
ingestPipelineWrapper = {
id: `${index}-pipeline`,
pipeline,
};
}
let createPipelinesWrappers: IngestPipelineWrapper[] | undefined;
if (createPipelines) {
createPipelinesWrappers = createPipelines.map((p, i) => {
return {
id: `${index}-${i}-pipeline`,
pipeline: p,
};
this._pipelines.push({
id: `${index}-${i}-pipeline`,
pipeline,
});
}
this._index = index;
this._pipeline = pipeline;
// if an @timestamp field has been added to the
// mappings, use this field as the time field.
@ -128,68 +117,48 @@ export abstract class Importer implements IImporter {
: undefined;
this._initialized = true;
}
return await callImportRoute({
id: undefined,
public async initializeImport(
index: string,
settings: IndicesIndexSettings,
mappings: MappingTypeMapping,
pipelines: Array<IngestPipeline | undefined>
) {
this._initialize(index, mappings, pipelines);
return await callInitializeImportRoute({
index,
data: [],
settings,
mappings,
ingestPipeline: ingestPipelineWrapper,
createPipelines: createPipelinesWrappers,
ingestPipelines: this._pipelines,
});
}
public async initializeWithoutCreate(
index: string,
mappings: MappingTypeMapping,
pipeline: IngestPipeline | undefined
pipelines: IngestPipeline[]
) {
if (pipeline !== undefined) {
if (pipelineContainsSpecialProcessors(pipeline)) {
// pipeline contains processors which we know are slow
// so reduce the chunk size significantly to avoid timeouts
this._chunkSize = REDUCED_CHUNK_SIZE;
}
}
this._index = index;
this._pipeline = pipeline;
// if an @timestamp field has been added to the
// mappings, use this field as the time field.
// This relies on the field being populated by
// the ingest pipeline on ingest
this._timeFieldName = isPopulatedObject(mappings.properties, [DEFAULT_TIME_FIELD])
? DEFAULT_TIME_FIELD
: undefined;
this._initialized = true;
this._initialize(index, mappings, pipelines);
}
public async import(
id: string,
index: string,
pipelineId: string | undefined,
ingestPipelineId: string,
setImportProgress: (progress: number) => void
): Promise<ImportResults> {
if (!id || !index) {
if (!index) {
return {
success: false,
error: i18n.translate('xpack.fileUpload.import.noIdOrIndexSuppliedErrorMessage', {
defaultMessage: 'no ID or index supplied',
error: i18n.translate('xpack.fileUpload.import.noIndexSuppliedErrorMessage', {
defaultMessage: 'No index supplied',
}),
};
}
const chunks = createDocumentChunks(this._docArray, this._chunkSize);
const ingestPipeline: IngestPipelineWrapper | undefined = pipelineId
? {
id: pipelineId,
}
: undefined;
let success = true;
const failures: ImportFailure[] = [];
let error;
@ -200,7 +169,6 @@ export abstract class Importer implements IImporter {
success: false,
failures: [],
docCount: 0,
id: '',
index: '',
pipelineId: '',
};
@ -208,12 +176,9 @@ export abstract class Importer implements IImporter {
while (resp.success === false && retries > 0) {
try {
resp = await callImportRoute({
id,
index,
ingestPipelineId,
data: chunks[i],
settings: {},
mappings: {},
ingestPipeline,
});
if (retries < IMPORT_RETRIES) {
@ -269,7 +234,8 @@ export abstract class Importer implements IImporter {
}
public async previewIndexTimeRange() {
if (this._initialized === false || this._pipeline === undefined) {
const ingestPipeline = this._pipelines[0];
if (this._initialized === false || ingestPipeline?.pipeline === undefined) {
throw new Error('Import has not been initialized');
}
@ -280,7 +246,7 @@ export abstract class Importer implements IImporter {
const body = JSON.stringify({
docs: firstDocs.concat(lastDocs),
pipeline: this._pipeline,
pipeline: ingestPipeline.pipeline,
timeField: this._timeFieldName,
});
return await getHttp().fetch<{ start: number | null; end: number | null }>({
@ -291,14 +257,15 @@ export abstract class Importer implements IImporter {
});
}
public async deletePipelines(pipelineIds: string[]) {
// remove_pipelines
// const body = JSON.stringify({
// pipelineIds,
// });
public async deletePipelines() {
const ids = this._pipelines.filter((p) => p.pipeline !== undefined).map((p) => p.id);
if (ids.length === 0) {
return [];
}
return await getHttp().fetch<IngestDeletePipelineResponse[]>({
path: `/internal/file_upload/remove_pipelines/${pipelineIds.join(',')}`,
path: `/internal/file_upload/remove_pipelines/${ids.join(',')}`,
method: 'DELETE',
version: '1',
});
@ -391,39 +358,3 @@ function pipelineContainsSpecialProcessors(pipeline: IngestPipeline) {
const specialProcessors = ['inference', 'enrich'];
return intersection(specialProcessors, keys).length !== 0;
}
export function callImportRoute({
id,
index,
data,
settings,
mappings,
ingestPipeline,
createPipelines,
}: {
id: string | undefined;
index: string;
data: ImportDoc[];
settings: IndicesIndexSettings;
mappings: MappingTypeMapping;
ingestPipeline: IngestPipelineWrapper | undefined;
createPipelines?: IngestPipelineWrapper[];
}) {
const query = id !== undefined ? { id } : {};
const body = JSON.stringify({
index,
data,
settings,
mappings,
ingestPipeline,
createPipelines,
});
return getHttp().fetch<ImportResponse>({
path: `/internal/file_upload/import`,
method: 'POST',
version: '1',
query,
body,
});
}

View file

@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type {
IndicesIndexSettings,
MappingTypeMapping,
} from '@elastic/elasticsearch/lib/api/types';
import type {
IngestPipelineWrapper,
InitializeImportResponse,
ImportDoc,
ImportResponse,
} from '../../common/types';
import { getHttp } from '../kibana_services';
interface CallInitializeImportRoute {
index: string;
settings: IndicesIndexSettings;
mappings: MappingTypeMapping;
ingestPipelines?: IngestPipelineWrapper[];
}
interface CallImportRoute {
index: string;
ingestPipelineId: string;
data: ImportDoc[];
}
export function callInitializeImportRoute({
index,
settings,
mappings,
ingestPipelines,
}: CallInitializeImportRoute) {
const body = JSON.stringify({
index,
settings,
mappings,
ingestPipelines,
});
return getHttp().fetch<InitializeImportResponse>({
path: `/internal/file_upload/initialize_import`,
method: 'POST',
version: '1',
body,
});
}
export function callImportRoute({ index, data, ingestPipelineId }: CallImportRoute) {
const body = JSON.stringify({
index,
ingestPipelineId,
data,
});
return getHttp().fetch<ImportResponse>({
path: `/internal/file_upload/import`,
method: 'POST',
version: '2',
body,
});
}

View file

@ -11,7 +11,12 @@ import type {
MappingTypeMapping,
} from '@elastic/elasticsearch/lib/api/types';
import type { ImportFailure, IngestPipeline, ImportDoc, ImportResponse } from '../../common/types';
import type {
ImportFailure,
IngestPipeline,
ImportDoc,
InitializeImportResponse,
} from '../../common/types';
export interface ImportConfig {
settings: IndicesIndexSettings;
@ -44,23 +49,21 @@ export interface IImporter {
index: string,
settings: IndicesIndexSettings,
mappings: MappingTypeMapping,
pipeline: IngestPipeline | undefined,
createPipelines?: IngestPipeline[]
): Promise<ImportResponse>;
pipeline: Array<IngestPipeline | undefined>
): Promise<InitializeImportResponse>;
initializeWithoutCreate(
index: string,
mappings: MappingTypeMapping,
pipeline: IngestPipeline | undefined
pipelines: IngestPipeline[]
): void;
import(
id: string,
index: string,
pipelineId: string | undefined,
ingestPipelineId: string | undefined,
setImportProgress: (progress: number) => void
): Promise<ImportResults>;
initialized(): boolean;
getIndex(): string | undefined;
getTimeField(): string | undefined;
previewIndexTimeRange(): Promise<{ start: number | null; end: number | null }>;
deletePipelines(pipelineIds: string[]): Promise<IngestDeletePipelineResponse[]>;
deletePipelines(): Promise<IngestDeletePipelineResponse[]>;
}

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { IScopedClusterClient } from '@kbn/core/server';
import type { IScopedClusterClient } from '@kbn/core/server';
import type {
BulkRequest,
IndicesCreateRequest,
@ -13,56 +13,72 @@ import type {
MappingTypeMapping,
} from '@elastic/elasticsearch/lib/api/types';
import { INDEX_META_DATA_CREATED_BY } from '../common/constants';
import { ImportResponse, ImportFailure, InputData, IngestPipelineWrapper } from '../common/types';
import type {
ImportResponse,
ImportFailure,
InputData,
IngestPipelineWrapper,
InitializeImportResponse,
} from '../common/types';
export function importDataProvider({ asCurrentUser }: IScopedClusterClient) {
async function importData(
id: string | undefined,
async function initializeImport(
index: string,
settings: IndicesIndexSettings,
mappings: MappingTypeMapping,
ingestPipeline: IngestPipelineWrapper | undefined,
createPipelines: IngestPipelineWrapper[],
data: InputData
): Promise<ImportResponse> {
ingestPipelines: IngestPipelineWrapper[]
): Promise<InitializeImportResponse> {
let createdIndex;
let createdPipelineId;
const docCount = data.length;
const createdPipelineIds: Array<string | undefined> = [];
const id = generateId();
try {
const pipelineId = ingestPipeline?.id;
const pipeline = ingestPipeline?.pipeline;
await createIndex(index, settings, mappings);
createdIndex = index;
if (id === undefined) {
// first chunk of data, create the index and id to return
id = generateId();
await createIndex(index, settings, mappings);
createdIndex = index;
// create the pipeline if one has been supplied
if (createPipelines !== undefined) {
for (const p of createPipelines) {
const resp = await createPipeline(p.id, p.pipeline);
if (resp.acknowledged !== true) {
throw resp;
}
// create the pipeline if one has been supplied
if (ingestPipelines !== undefined) {
for (const p of ingestPipelines) {
if (p.pipeline === undefined) {
createdPipelineIds.push(undefined);
continue;
}
} else if (pipelineId !== undefined) {
const resp = await createPipeline(pipelineId, pipeline);
const resp = await createPipeline(p.id, p.pipeline);
createdPipelineIds.push(p.id);
if (resp.acknowledged !== true) {
throw resp;
}
}
createdPipelineId = pipelineId;
} else {
createdIndex = index;
createdPipelineId = pipelineId;
}
return {
success: true,
id,
index: createdIndex,
pipelineIds: createdPipelineIds,
};
} catch (error) {
return {
success: false,
id: id!,
index: createdIndex ?? '',
pipelineIds: createdPipelineIds,
error: error.body !== undefined ? error.body : error,
};
}
}
async function importData(
index: string,
ingestPipelineId: string | undefined,
data: InputData
): Promise<ImportResponse> {
const docCount = data.length;
const pipelineId = ingestPipelineId;
try {
let failures: ImportFailure[] = [];
if (data.length) {
const resp = await indexData(index, createdPipelineId, data);
const resp = await indexData(index, pipelineId, data);
if (resp.success === false) {
if (resp.ingestError) {
// all docs failed, abort
@ -77,18 +93,16 @@ export function importDataProvider({ asCurrentUser }: IScopedClusterClient) {
return {
success: true,
id,
index: createdIndex,
pipelineId: createdPipelineId,
index,
pipelineId,
docCount,
failures,
};
} catch (error) {
return {
success: false,
id: id!,
index: createdIndex,
pipelineId: createdPipelineId,
index,
pipelineId,
error: error.body !== undefined ? error.body : error,
docCount,
ingestError: error.ingestError,
@ -187,6 +201,7 @@ export function importDataProvider({ asCurrentUser }: IScopedClusterClient) {
}
return {
initializeImport,
importData,
};
}

View file

@ -6,14 +6,8 @@
*/
import { schema } from '@kbn/config-schema';
import type { IScopedClusterClient } from '@kbn/core/server';
import type { CoreSetup, Logger } from '@kbn/core/server';
import type {
IndicesIndexSettings,
MappingTypeMapping,
} from '@elastic/elasticsearch/lib/api/types';
import { MAX_FILE_SIZE_BYTES, MAX_TIKA_FILE_SIZE_BYTES } from '../common/constants';
import type { IngestPipelineWrapper, InputData } from '../common/types';
import { wrapError } from './error_wrapper';
import { importDataProvider } from './import_data';
import { getTimeFieldRange } from './get_time_field_range';
@ -22,28 +16,15 @@ import { analyzeFile } from './analyze_file';
import { updateTelemetry } from './telemetry';
import {
importFileBodySchema,
importFileQuerySchema,
analyzeFileQuerySchema,
runtimeMappingsSchema,
initializeImportFileBodySchema,
} from './schemas';
import type { StartDeps } from './types';
import { checkFileUploadPrivileges } from './check_privileges';
import { previewIndexTimeRange } from './preview_index_time_range';
import { previewTikaContents } from './preview_tika_contents';
function importData(
client: IScopedClusterClient,
id: string | undefined,
index: string,
settings: IndicesIndexSettings,
mappings: MappingTypeMapping,
ingestPipeline: IngestPipelineWrapper,
createPipelines: IngestPipelineWrapper[],
data: InputData
) {
const { importData: importDataFunc } = importDataProvider(client);
return importDataFunc(id, index, settings, mappings, ingestPipeline, createPipelines, data);
}
import { IngestPipelineWrapper } from '../common/types';
/**
* Routes for the file upload.
@ -143,6 +124,59 @@ export function fileUploadRoutes(coreSetup: CoreSetup<StartDeps, unknown>, logge
}
);
/**
* @apiGroup FileDataVisualizer
*
* @api {post} /internal/file_upload/initialize_import Initialize import file process
* @apiName InitializeImportFile
* @apiDescription Creates an index and ingest pipelines for importing file data.
*
* @apiSchema (body) initializeImportFileBodySchema
*/
router.versioned
.post({
path: '/internal/file_upload/initialize_import',
access: 'internal',
options: {
body: {
accepts: ['application/json'],
maxBytes: MAX_FILE_SIZE_BYTES,
},
},
})
.addVersion(
{
version: '1',
security: {
authz: {
enabled: false,
reason:
'This route is opted out from authorization because permissions will be checked by elasticsearch',
},
},
validate: {
request: {
body: initializeImportFileBodySchema,
},
},
},
async (context, request, response) => {
try {
const { index, settings, mappings, ingestPipelines } = request.body;
const esClient = (await context.core).elasticsearch.client;
await updateTelemetry();
const { initializeImport } = importDataProvider(esClient);
const result = await initializeImport(index, settings, mappings, ingestPipelines);
return response.ok({ body: result });
} catch (e) {
return response.customError(wrapError(e));
}
}
);
/**
* @apiGroup FileDataVisualizer
*
@ -150,7 +184,6 @@ export function fileUploadRoutes(coreSetup: CoreSetup<StartDeps, unknown>, logge
* @apiName ImportFile
* @apiDescription Imports file data into elasticsearch index.
*
* @apiSchema (query) importFileQuerySchema
* @apiSchema (body) importFileBodySchema
*/
router.versioned
@ -176,8 +209,33 @@ export function fileUploadRoutes(coreSetup: CoreSetup<StartDeps, unknown>, logge
},
validate: {
request: {
query: importFileQuerySchema,
body: importFileBodySchema,
query: schema.object({
id: schema.maybe(schema.string()),
}),
body: schema.object({
index: schema.string(),
data: schema.arrayOf(schema.any()),
settings: schema.maybe(schema.any()),
/** Mappings */
mappings: schema.any(),
/** Ingest pipeline definition */
ingestPipeline: schema.maybe(
schema.object({
id: schema.maybe(schema.string()),
pipeline: schema.maybe(schema.any()),
})
),
createPipelines: schema.maybe(
schema.arrayOf(
schema.maybe(
schema.object({
id: schema.maybe(schema.string()),
pipeline: schema.maybe(schema.any()),
})
)
)
),
}),
},
},
},
@ -187,24 +245,57 @@ export function fileUploadRoutes(coreSetup: CoreSetup<StartDeps, unknown>, logge
const { index, data, settings, mappings, ingestPipeline, createPipelines } = request.body;
const esClient = (await context.core).elasticsearch.client;
// `id` being `undefined` tells us that this is a new import due to create a new index.
// follow-up import calls to just add additional data will include the `id` of the created
// index, we'll ignore those and don't increment the counter.
const { initializeImport, importData } = importDataProvider(esClient);
if (id === undefined) {
await updateTelemetry();
const pipelines = [
...(ingestPipeline ? [ingestPipeline] : []),
...(createPipelines ?? []),
] as IngestPipelineWrapper[];
const result = await initializeImport(index, settings, mappings, pipelines);
// format the response to match v1 response
const body = {
id: 'tempId',
index: result.index,
pipelineId: result.pipelineIds[0],
success: result.success,
};
return response.ok({ body });
}
const result = await importData(
esClient,
id,
index,
settings,
mappings,
// @ts-expect-error
ingestPipeline,
createPipelines,
data
);
const result = await importData(index, ingestPipeline?.id ?? '', data);
return response.ok({ body: result });
} catch (e) {
return response.customError(wrapError(e));
}
}
)
.addVersion(
{
version: '2',
security: {
authz: {
enabled: false,
reason:
'This route is opted out from authorization because permissions will be checked by elasticsearch',
},
},
validate: {
request: {
body: importFileBodySchema,
},
},
},
async (context, request, response) => {
try {
const { index, data, ingestPipelineId } = request.body;
const esClient = (await context.core).elasticsearch.client;
const { importData } = importDataProvider(esClient);
const result = await importData(index, ingestPipelineId, data);
return response.ok({ body: result });
} catch (e) {
return response.customError(wrapError(e));

View file

@ -26,26 +26,25 @@ export const analyzeFileQuerySchema = schema.object({
timestamp_format: schema.maybe(schema.string()),
});
export const importFileQuerySchema = schema.object({
id: schema.maybe(schema.string()),
const ingestPipeline = schema.object({
id: schema.string(),
pipeline: schema.maybe(schema.any()),
});
const ingestPipeline = schema.maybe(
schema.object({
id: schema.maybe(schema.string()),
pipeline: schema.maybe(schema.any()),
})
);
export const importFileBodySchema = schema.object({
export const initializeImportFileBodySchema = schema.object({
index: schema.string(),
data: schema.arrayOf(schema.any()),
/* Index settings */
settings: schema.maybe(schema.any()),
/** Mappings */
mappings: schema.any(),
/** Ingest pipeline definition */
ingestPipeline,
createPipelines: schema.maybe(schema.arrayOf(ingestPipeline)),
ingestPipelines: schema.arrayOf(ingestPipeline),
});
export const importFileBodySchema = schema.object({
index: schema.string(),
data: schema.arrayOf(schema.any()),
ingestPipelineId: schema.maybe(schema.string()),
});
export const runtimeMappingsSchema = schema.object(

View file

@ -17459,7 +17459,6 @@
"xpack.fileUpload.geoUploadWizard.outOfTotalMsg": "sur {totalFeaturesCount}",
"xpack.fileUpload.geoUploadWizard.partialImportMsg": "Impossible d'indexer {failedFeaturesCount} {outOfTotalMsg} fonctionnalités.",
"xpack.fileUpload.geoUploadWizard.writingToIndex": "Écriture dans l'index : {progress} % terminé",
"xpack.fileUpload.import.noIdOrIndexSuppliedErrorMessage": "aucun ID ni index fournis",
"xpack.fileUpload.importComplete.copyButtonAriaLabel": "Copier dans le presse-papiers",
"xpack.fileUpload.importComplete.dataViewResponse": "Réponse de la vue de données",
"xpack.fileUpload.importComplete.indexingResponse": "Importer la réponse",

View file

@ -17318,7 +17318,6 @@
"xpack.fileUpload.geoUploadWizard.outOfTotalMsg": "{totalFeaturesCount}件中",
"xpack.fileUpload.geoUploadWizard.partialImportMsg": "{failedFeaturesCount} {outOfTotalMsg}個の特徴量をインデックス化できません",
"xpack.fileUpload.geoUploadWizard.writingToIndex": "インデックスに書き込み中:{progress}%完了",
"xpack.fileUpload.import.noIdOrIndexSuppliedErrorMessage": "ID またはインデックスが提供されていません",
"xpack.fileUpload.importComplete.copyButtonAriaLabel": "クリップボードにコピー",
"xpack.fileUpload.importComplete.dataViewResponse": "データビュー応答",
"xpack.fileUpload.importComplete.indexingResponse": "応答をインポート",

View file

@ -17042,7 +17042,6 @@
"xpack.fileUpload.geoUploadWizard.outOfTotalMsg": ",共 {totalFeaturesCount} 个",
"xpack.fileUpload.geoUploadWizard.partialImportMsg": "无法索引 {failedFeaturesCount} 个 {outOfTotalMsg} 特征。",
"xpack.fileUpload.geoUploadWizard.writingToIndex": "正在写入索引:已完成 {progress}%",
"xpack.fileUpload.import.noIdOrIndexSuppliedErrorMessage": "未提供任何 ID 或索引",
"xpack.fileUpload.importComplete.copyButtonAriaLabel": "复制到剪贴板",
"xpack.fileUpload.importComplete.dataViewResponse": "数据视图响应",
"xpack.fileUpload.importComplete.indexingResponse": "导入响应",