mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
[SIEM][Detection Engine] Speeds up value list imports by enabling streaming of files.
## Summary * Changes the value list imports to use a streaming in model * Adds a custom light hand spun multi-part parser for the incoming text * Adds a buffer pause and resume which continues to buffer the incoming data if an async event such as creating a list from the attachment file needs to happen but does not emit the lines until the resume continues. * Adds a data slicing if the buffer becomes larger than the maximum so that if we begin buffering too quickly within memory we don't blow up the limit of Elastic Search. * Adds unit tests ### Checklist - [x] [Unit or functional tests](https://github.com/elastic/kibana/blob/master/CONTRIBUTING.md#cross-browser-compatibility) were updated or added to match the most common scenarios
This commit is contained in:
parent
1f5a1fe66c
commit
3863921616
18 changed files with 507 additions and 100 deletions
|
@ -41,6 +41,8 @@ export const OPERATOR = 'included';
|
|||
export const ENTRY_VALUE = 'some host name';
|
||||
export const MATCH = 'match';
|
||||
export const MATCH_ANY = 'match_any';
|
||||
export const MAX_IMPORT_PAYLOAD_BYTES = 40000000;
|
||||
export const IMPORT_BUFFER_SIZE = 1000;
|
||||
export const LIST = 'list';
|
||||
export const EXISTS = 'exists';
|
||||
export const NESTED = 'nested';
|
||||
|
|
27
x-pack/plugins/lists/server/config.mock.ts
Normal file
27
x-pack/plugins/lists/server/config.mock.ts
Normal file
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import {
|
||||
IMPORT_BUFFER_SIZE,
|
||||
LIST_INDEX,
|
||||
LIST_ITEM_INDEX,
|
||||
MAX_IMPORT_PAYLOAD_BYTES,
|
||||
} from '../common/constants.mock';
|
||||
|
||||
import { ConfigType } from './config';
|
||||
|
||||
export const getConfigMock = (): Partial<ConfigType> => ({
|
||||
listIndex: LIST_INDEX,
|
||||
listItemIndex: LIST_ITEM_INDEX,
|
||||
});
|
||||
|
||||
export const getConfigMockDecoded = (): ConfigType => ({
|
||||
enabled: true,
|
||||
importBufferSize: IMPORT_BUFFER_SIZE,
|
||||
listIndex: LIST_INDEX,
|
||||
listItemIndex: LIST_ITEM_INDEX,
|
||||
maxImportPayloadBytes: MAX_IMPORT_PAYLOAD_BYTES,
|
||||
});
|
64
x-pack/plugins/lists/server/config.test.ts
Normal file
64
x-pack/plugins/lists/server/config.test.ts
Normal file
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { ConfigSchema, ConfigType } from './config';
|
||||
import { getConfigMock, getConfigMockDecoded } from './config.mock';
|
||||
|
||||
describe('config_schema', () => {
|
||||
test('it works with expected basic mock data set and defaults', () => {
|
||||
expect(ConfigSchema.validate(getConfigMock())).toEqual(getConfigMockDecoded());
|
||||
});
|
||||
|
||||
test('it throws if given an invalid value', () => {
|
||||
const mock: Partial<ConfigType> & { madeUpValue: string } = {
|
||||
madeUpValue: 'something',
|
||||
...getConfigMock(),
|
||||
};
|
||||
expect(() => ConfigSchema.validate(mock)).toThrow(
|
||||
'[madeUpValue]: definition for this key is missing'
|
||||
);
|
||||
});
|
||||
|
||||
test('it throws if the "maxImportPayloadBytes" value is 0', () => {
|
||||
const mock: ConfigType = {
|
||||
...getConfigMockDecoded(),
|
||||
maxImportPayloadBytes: 0,
|
||||
};
|
||||
expect(() => ConfigSchema.validate(mock)).toThrow(
|
||||
'[maxImportPayloadBytes]: Value must be equal to or greater than [1].'
|
||||
);
|
||||
});
|
||||
|
||||
test('it throws if the "maxImportPayloadBytes" value is less than 0', () => {
|
||||
const mock: ConfigType = {
|
||||
...getConfigMockDecoded(),
|
||||
maxImportPayloadBytes: -1,
|
||||
};
|
||||
expect(() => ConfigSchema.validate(mock)).toThrow(
|
||||
'[maxImportPayloadBytes]: Value must be equal to or greater than [1].'
|
||||
);
|
||||
});
|
||||
|
||||
test('it throws if the "importBufferSize" value is 0', () => {
|
||||
const mock: ConfigType = {
|
||||
...getConfigMockDecoded(),
|
||||
importBufferSize: 0,
|
||||
};
|
||||
expect(() => ConfigSchema.validate(mock)).toThrow(
|
||||
'[importBufferSize]: Value must be equal to or greater than [1].'
|
||||
);
|
||||
});
|
||||
|
||||
test('it throws if the "importBufferSize" value is less than 0', () => {
|
||||
const mock: ConfigType = {
|
||||
...getConfigMockDecoded(),
|
||||
importBufferSize: -1,
|
||||
};
|
||||
expect(() => ConfigSchema.validate(mock)).toThrow(
|
||||
'[importBufferSize]: Value must be equal to or greater than [1].'
|
||||
);
|
||||
});
|
||||
});
|
|
@ -8,8 +8,10 @@ import { TypeOf, schema } from '@kbn/config-schema';
|
|||
|
||||
export const ConfigSchema = schema.object({
|
||||
enabled: schema.boolean({ defaultValue: true }),
|
||||
importBufferSize: schema.number({ defaultValue: 1000, min: 1 }),
|
||||
listIndex: schema.string({ defaultValue: '.lists' }),
|
||||
listItemIndex: schema.string({ defaultValue: '.items' }),
|
||||
maxImportPayloadBytes: schema.number({ defaultValue: 40000000, min: 1 }),
|
||||
});
|
||||
|
||||
export type ConfigType = TypeOf<typeof ConfigSchema>;
|
||||
|
|
|
@ -12,12 +12,6 @@ import { ConfigType } from './config';
|
|||
|
||||
export const createConfig$ = (
|
||||
context: PluginInitializerContext
|
||||
): Observable<
|
||||
Readonly<{
|
||||
enabled: boolean;
|
||||
listIndex: string;
|
||||
listItemIndex: string;
|
||||
}>
|
||||
> => {
|
||||
): Observable<Readonly<ConfigType>> => {
|
||||
return context.config.create<ConfigType>().pipe(map((config) => config));
|
||||
};
|
||||
|
|
|
@ -48,7 +48,7 @@ export class ListPlugin
|
|||
|
||||
core.http.registerRouteHandlerContext('lists', this.createRouteHandlerContext());
|
||||
const router = core.http.createRouter();
|
||||
initRoutes(router);
|
||||
initRoutes(router, config);
|
||||
|
||||
return {
|
||||
getExceptionListClient: (savedObjectsClient, user): ExceptionListClient => {
|
||||
|
|
|
@ -4,50 +4,40 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { Readable } from 'stream';
|
||||
|
||||
import { IRouter } from 'kibana/server';
|
||||
import { schema } from '@kbn/config-schema';
|
||||
|
||||
import { LIST_ITEM_URL } from '../../common/constants';
|
||||
import { buildRouteValidation, buildSiemResponse, transformError } from '../siem_server_deps';
|
||||
import { validate } from '../../common/siem_common_deps';
|
||||
import { importListItemQuerySchema, importListItemSchema, listSchema } from '../../common/schemas';
|
||||
import { importListItemQuerySchema, listSchema } from '../../common/schemas';
|
||||
import { ConfigType } from '../config';
|
||||
|
||||
import { createStreamFromBuffer } from './utils/create_stream_from_buffer';
|
||||
|
||||
import { getListClient } from '.';
|
||||
|
||||
export interface HapiReadableStream extends Readable {
|
||||
hapi: {
|
||||
filename: string;
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Special interface since we are streaming in a file through a reader
|
||||
*/
|
||||
export interface ImportListItemHapiFileSchema {
|
||||
file: HapiReadableStream;
|
||||
}
|
||||
|
||||
export const importListItemRoute = (router: IRouter): void => {
|
||||
export const importListItemRoute = (router: IRouter, config: ConfigType): void => {
|
||||
router.post(
|
||||
{
|
||||
options: {
|
||||
body: {
|
||||
output: 'stream',
|
||||
accepts: ['multipart/form-data'],
|
||||
maxBytes: config.maxImportPayloadBytes,
|
||||
parse: false,
|
||||
},
|
||||
tags: ['access:lists'],
|
||||
},
|
||||
path: `${LIST_ITEM_URL}/_import`,
|
||||
validate: {
|
||||
body: buildRouteValidation<typeof importListItemSchema, ImportListItemHapiFileSchema>(
|
||||
importListItemSchema
|
||||
),
|
||||
body: schema.buffer(),
|
||||
query: buildRouteValidation(importListItemQuerySchema),
|
||||
},
|
||||
},
|
||||
async (context, request, response) => {
|
||||
const siemResponse = buildSiemResponse(response);
|
||||
try {
|
||||
const stream = createStreamFromBuffer(request.body);
|
||||
const { deserializer, list_id: listId, serializer, type } = request.query;
|
||||
const lists = getListClient(context);
|
||||
if (listId != null) {
|
||||
|
@ -63,7 +53,7 @@ export const importListItemRoute = (router: IRouter): void => {
|
|||
listId,
|
||||
meta: undefined,
|
||||
serializer: list.serializer,
|
||||
stream: request.body.file,
|
||||
stream,
|
||||
type: list.type,
|
||||
});
|
||||
|
||||
|
@ -74,26 +64,21 @@ export const importListItemRoute = (router: IRouter): void => {
|
|||
return response.ok({ body: validated ?? {} });
|
||||
}
|
||||
} else if (type != null) {
|
||||
const { filename } = request.body.file.hapi;
|
||||
// TODO: Should we prevent the same file from being uploaded multiple times?
|
||||
const list = await lists.createListIfItDoesNotExist({
|
||||
description: `File uploaded from file system of ${filename}`,
|
||||
const importedList = await lists.importListItemsToStream({
|
||||
deserializer,
|
||||
id: filename,
|
||||
listId: undefined,
|
||||
meta: undefined,
|
||||
name: filename,
|
||||
serializer,
|
||||
stream,
|
||||
type,
|
||||
});
|
||||
await lists.importListItemsToStream({
|
||||
deserializer: list.deserializer,
|
||||
listId: list.id,
|
||||
meta: undefined,
|
||||
serializer: list.serializer,
|
||||
stream: request.body.file,
|
||||
type: list.type,
|
||||
});
|
||||
const [validated, errors] = validate(list, listSchema);
|
||||
if (importedList == null) {
|
||||
return siemResponse.error({
|
||||
body: 'Unable to parse a valid fileName during import',
|
||||
statusCode: 400,
|
||||
});
|
||||
}
|
||||
const [validated, errors] = validate(importedList, listSchema);
|
||||
if (errors != null) {
|
||||
return siemResponse.error({ body: errors, statusCode: 500 });
|
||||
} else {
|
||||
|
|
|
@ -6,6 +6,8 @@
|
|||
|
||||
import { IRouter } from 'kibana/server';
|
||||
|
||||
import { ConfigType } from '../config';
|
||||
|
||||
import {
|
||||
createExceptionListItemRoute,
|
||||
createExceptionListRoute,
|
||||
|
@ -36,7 +38,7 @@ import {
|
|||
updateListRoute,
|
||||
} from '.';
|
||||
|
||||
export const initRoutes = (router: IRouter): void => {
|
||||
export const initRoutes = (router: IRouter, config: ConfigType): void => {
|
||||
// lists
|
||||
createListRoute(router);
|
||||
readListRoute(router);
|
||||
|
@ -52,7 +54,7 @@ export const initRoutes = (router: IRouter): void => {
|
|||
deleteListItemRoute(router);
|
||||
patchListItemRoute(router);
|
||||
exportListItemRoute(router);
|
||||
importListItemRoute(router);
|
||||
importListItemRoute(router, config);
|
||||
findListItemRoute(router);
|
||||
|
||||
// indexes of lists
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
import { Readable } from 'stream';
|
||||
|
||||
export const createStreamFromBuffer = (buffer: Buffer): Readable => {
|
||||
const stream = new Readable();
|
||||
stream.push(buffer);
|
||||
stream.push(null);
|
||||
return stream;
|
||||
};
|
|
@ -4,15 +4,44 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { IMPORT_BUFFER_SIZE } from '../../../common/constants.mock';
|
||||
|
||||
import { BufferLines } from './buffer_lines';
|
||||
import { TestReadable } from './test_readable.mock';
|
||||
|
||||
describe('buffer_lines', () => {
|
||||
test('it will throw if given a buffer size of zero', () => {
|
||||
expect(() => {
|
||||
new BufferLines({ bufferSize: 0, input: new TestReadable() });
|
||||
}).toThrow('bufferSize must be greater than zero');
|
||||
});
|
||||
|
||||
test('it will throw if given a buffer size of -1', () => {
|
||||
expect(() => {
|
||||
new BufferLines({ bufferSize: -1, input: new TestReadable() });
|
||||
}).toThrow('bufferSize must be greater than zero');
|
||||
});
|
||||
|
||||
test('it can read a single line', (done) => {
|
||||
const input = new TestReadable();
|
||||
input.push('line one\n');
|
||||
input.push(null);
|
||||
const bufferedLine = new BufferLines({ input });
|
||||
const bufferedLine = new BufferLines({ bufferSize: IMPORT_BUFFER_SIZE, input });
|
||||
let linesToTest: string[] = [];
|
||||
bufferedLine.on('lines', (lines: string[]) => {
|
||||
linesToTest = [...linesToTest, ...lines];
|
||||
});
|
||||
bufferedLine.on('close', () => {
|
||||
expect(linesToTest).toEqual(['line one']);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
test('it can read a single line using a buffer size of 1', (done) => {
|
||||
const input = new TestReadable();
|
||||
input.push('line one\n');
|
||||
input.push(null);
|
||||
const bufferedLine = new BufferLines({ bufferSize: 1, input });
|
||||
let linesToTest: string[] = [];
|
||||
bufferedLine.on('lines', (lines: string[]) => {
|
||||
linesToTest = [...linesToTest, ...lines];
|
||||
|
@ -28,7 +57,23 @@ describe('buffer_lines', () => {
|
|||
input.push('line one\n');
|
||||
input.push('line two\n');
|
||||
input.push(null);
|
||||
const bufferedLine = new BufferLines({ input });
|
||||
const bufferedLine = new BufferLines({ bufferSize: IMPORT_BUFFER_SIZE, input });
|
||||
let linesToTest: string[] = [];
|
||||
bufferedLine.on('lines', (lines: string[]) => {
|
||||
linesToTest = [...linesToTest, ...lines];
|
||||
});
|
||||
bufferedLine.on('close', () => {
|
||||
expect(linesToTest).toEqual(['line one', 'line two']);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
test('it can read two lines using a buffer size of 1', (done) => {
|
||||
const input = new TestReadable();
|
||||
input.push('line one\n');
|
||||
input.push('line two\n');
|
||||
input.push(null);
|
||||
const bufferedLine = new BufferLines({ bufferSize: 1, input });
|
||||
let linesToTest: string[] = [];
|
||||
bufferedLine.on('lines', (lines: string[]) => {
|
||||
linesToTest = [...linesToTest, ...lines];
|
||||
|
@ -44,7 +89,7 @@ describe('buffer_lines', () => {
|
|||
input.push('line one\n');
|
||||
input.push('line one\n');
|
||||
input.push(null);
|
||||
const bufferedLine = new BufferLines({ input });
|
||||
const bufferedLine = new BufferLines({ bufferSize: IMPORT_BUFFER_SIZE, input });
|
||||
let linesToTest: string[] = [];
|
||||
bufferedLine.on('lines', (lines: string[]) => {
|
||||
linesToTest = [...linesToTest, ...lines];
|
||||
|
@ -58,7 +103,7 @@ describe('buffer_lines', () => {
|
|||
test('it can close out without writing any lines', (done) => {
|
||||
const input = new TestReadable();
|
||||
input.push(null);
|
||||
const bufferedLine = new BufferLines({ input });
|
||||
const bufferedLine = new BufferLines({ bufferSize: IMPORT_BUFFER_SIZE, input });
|
||||
let linesToTest: string[] = [];
|
||||
bufferedLine.on('lines', (lines: string[]) => {
|
||||
linesToTest = [...linesToTest, ...lines];
|
||||
|
@ -71,7 +116,7 @@ describe('buffer_lines', () => {
|
|||
|
||||
test('it can read 200 lines', (done) => {
|
||||
const input = new TestReadable();
|
||||
const bufferedLine = new BufferLines({ input });
|
||||
const bufferedLine = new BufferLines({ bufferSize: IMPORT_BUFFER_SIZE, input });
|
||||
let linesToTest: string[] = [];
|
||||
const size200: string[] = new Array(200).fill(null).map((_, index) => `${index}\n`);
|
||||
size200.forEach((element) => input.push(element));
|
||||
|
@ -84,4 +129,66 @@ describe('buffer_lines', () => {
|
|||
done();
|
||||
});
|
||||
});
|
||||
|
||||
test('it can read an example multi-part message', (done) => {
|
||||
const input = new TestReadable();
|
||||
input.push('--boundary\n');
|
||||
input.push('Content-type: text/plain\n');
|
||||
input.push('Content-Disposition: form-data; name="fieldName"; filename="filename.text"\n');
|
||||
input.push('\n');
|
||||
input.push('127.0.0.1\n');
|
||||
input.push('127.0.0.2\n');
|
||||
input.push('127.0.0.3\n');
|
||||
input.push('\n');
|
||||
input.push('--boundary--\n');
|
||||
input.push(null);
|
||||
const bufferedLine = new BufferLines({ bufferSize: IMPORT_BUFFER_SIZE, input });
|
||||
let linesToTest: string[] = [];
|
||||
bufferedLine.on('lines', (lines: string[]) => {
|
||||
linesToTest = [...linesToTest, ...lines];
|
||||
});
|
||||
bufferedLine.on('close', () => {
|
||||
expect(linesToTest).toEqual(['127.0.0.1', '127.0.0.2', '127.0.0.3']);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
test('it can read an empty multi-part message', (done) => {
|
||||
const input = new TestReadable();
|
||||
input.push('--boundary\n');
|
||||
input.push('Content-type: text/plain\n');
|
||||
input.push('Content-Disposition: form-data; name="fieldName"; filename="filename.text"\n');
|
||||
input.push('\n');
|
||||
input.push('\n');
|
||||
input.push('--boundary--\n');
|
||||
input.push(null);
|
||||
const bufferedLine = new BufferLines({ bufferSize: IMPORT_BUFFER_SIZE, input });
|
||||
let linesToTest: string[] = [];
|
||||
bufferedLine.on('lines', (lines: string[]) => {
|
||||
linesToTest = [...linesToTest, ...lines];
|
||||
});
|
||||
bufferedLine.on('close', () => {
|
||||
expect(linesToTest).toEqual([]);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
test('it can read a fileName from a multipart message', (done) => {
|
||||
const input = new TestReadable();
|
||||
input.push('--boundary\n');
|
||||
input.push('Content-type: text/plain\n');
|
||||
input.push('Content-Disposition: form-data; name="fieldName"; filename="filename.text"\n');
|
||||
input.push('\n');
|
||||
input.push('--boundary--\n');
|
||||
input.push(null);
|
||||
const bufferedLine = new BufferLines({ bufferSize: IMPORT_BUFFER_SIZE, input });
|
||||
let fileNameToTest: string;
|
||||
bufferedLine.on('fileName', (fileName: string) => {
|
||||
fileNameToTest = fileName;
|
||||
});
|
||||
bufferedLine.on('close', () => {
|
||||
expect(fileNameToTest).toEqual('filename.text');
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -7,18 +7,50 @@
|
|||
import readLine from 'readline';
|
||||
import { Readable } from 'stream';
|
||||
|
||||
const BUFFER_SIZE = 100;
|
||||
|
||||
export class BufferLines extends Readable {
|
||||
private set = new Set<string>();
|
||||
constructor({ input }: { input: NodeJS.ReadableStream }) {
|
||||
private boundary: string | null = null;
|
||||
private readableText: boolean = false;
|
||||
private paused: boolean = false;
|
||||
private bufferSize: number;
|
||||
constructor({ input, bufferSize }: { input: NodeJS.ReadableStream; bufferSize: number }) {
|
||||
super({ encoding: 'utf-8' });
|
||||
if (bufferSize <= 0) {
|
||||
throw new RangeError('bufferSize must be greater than zero');
|
||||
}
|
||||
this.bufferSize = bufferSize;
|
||||
|
||||
const readline = readLine.createInterface({
|
||||
input,
|
||||
});
|
||||
|
||||
// We are parsing multipart/form-data involving boundaries as fast as we can to get
|
||||
// * The filename if it exists and emit it
|
||||
// * The actual content within the multipart/form-data
|
||||
readline.on('line', (line) => {
|
||||
this.push(line);
|
||||
if (this.boundary == null && line.startsWith('--')) {
|
||||
this.boundary = `${line}--`;
|
||||
} else if (this.boundary != null && !this.readableText && line.trim() !== '') {
|
||||
if (line.startsWith('Content-Disposition')) {
|
||||
const fileNameMatch = RegExp('filename="(?<fileName>.+)"');
|
||||
const matches = fileNameMatch.exec(line);
|
||||
if (matches?.groups?.fileName != null) {
|
||||
this.emit('fileName', matches.groups.fileName);
|
||||
}
|
||||
}
|
||||
} else if (this.boundary != null && !this.readableText && line.trim() === '') {
|
||||
// we are ready to be readable text now for parsing
|
||||
this.readableText = true;
|
||||
} else if (this.readableText && line.trim() === '') {
|
||||
// skip and do nothing as this is either a empty line or an upcoming end is about to happen
|
||||
} else if (this.boundary != null && this.readableText && line === this.boundary) {
|
||||
// we are at the end of the stream
|
||||
this.boundary = null;
|
||||
this.readableText = false;
|
||||
} else {
|
||||
// we have actual content to push
|
||||
this.push(line);
|
||||
}
|
||||
});
|
||||
|
||||
readline.on('close', () => {
|
||||
|
@ -26,23 +58,54 @@ export class BufferLines extends Readable {
|
|||
});
|
||||
}
|
||||
|
||||
public _read(): void {
|
||||
// No operation but this is required to be implemented
|
||||
public _read(): void {}
|
||||
|
||||
public pause(): this {
|
||||
this.paused = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
public resume(): this {
|
||||
this.paused = false;
|
||||
return this;
|
||||
}
|
||||
|
||||
private emptyBuffer(): void {
|
||||
const arrayFromSet = Array.from(this.set);
|
||||
if (arrayFromSet.length === 0) {
|
||||
this.emit('lines', []);
|
||||
} else {
|
||||
while (arrayFromSet.length) {
|
||||
const spliced = arrayFromSet.splice(0, this.bufferSize);
|
||||
this.emit('lines', spliced);
|
||||
}
|
||||
}
|
||||
this.set.clear();
|
||||
}
|
||||
|
||||
public push(line: string | null): boolean {
|
||||
if (line == null) {
|
||||
this.emit('lines', Array.from(this.set));
|
||||
this.set.clear();
|
||||
this.emit('close');
|
||||
return true;
|
||||
} else {
|
||||
if (line != null) {
|
||||
this.set.add(line);
|
||||
if (this.set.size > BUFFER_SIZE) {
|
||||
this.emit('lines', Array.from(this.set));
|
||||
this.set.clear();
|
||||
return true;
|
||||
if (this.paused) {
|
||||
return false;
|
||||
} else {
|
||||
if (this.set.size > this.bufferSize) {
|
||||
this.emptyBuffer();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
if (this.paused) {
|
||||
// If we paused but have buffered all of the available data
|
||||
// we should do wait for 10(ms) and check again if we are paused
|
||||
// or not.
|
||||
setTimeout(() => {
|
||||
this.push(line);
|
||||
}, 10);
|
||||
return false;
|
||||
} else {
|
||||
this.emptyBuffer();
|
||||
this.emit('close');
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,9 +80,12 @@ export const createListItemsBulk = async ({
|
|||
},
|
||||
[]
|
||||
);
|
||||
|
||||
await callCluster('bulk', {
|
||||
body,
|
||||
index: listItemIndex,
|
||||
});
|
||||
try {
|
||||
await callCluster('bulk', {
|
||||
body,
|
||||
index: listItemIndex,
|
||||
});
|
||||
} catch (error) {
|
||||
// TODO: Log out the error with return values from the bulk insert into another index or saved object
|
||||
}
|
||||
};
|
||||
|
|
|
@ -5,14 +5,24 @@
|
|||
*/
|
||||
import { getCallClusterMock } from '../../../common/get_call_cluster.mock';
|
||||
import { ImportListItemsToStreamOptions, WriteBufferToItemsOptions } from '../items';
|
||||
import { LIST_ID, LIST_ITEM_INDEX, META, TYPE, USER } from '../../../common/constants.mock';
|
||||
import {
|
||||
LIST_ID,
|
||||
LIST_INDEX,
|
||||
LIST_ITEM_INDEX,
|
||||
META,
|
||||
TYPE,
|
||||
USER,
|
||||
} from '../../../common/constants.mock';
|
||||
import { getConfigMockDecoded } from '../../config.mock';
|
||||
|
||||
import { TestReadable } from './test_readable.mock';
|
||||
|
||||
export const getImportListItemsToStreamOptionsMock = (): ImportListItemsToStreamOptions => ({
|
||||
callCluster: getCallClusterMock(),
|
||||
config: getConfigMockDecoded(),
|
||||
deserializer: undefined,
|
||||
listId: LIST_ID,
|
||||
listIndex: LIST_INDEX,
|
||||
listItemIndex: LIST_ITEM_INDEX,
|
||||
meta: META,
|
||||
serializer: undefined,
|
||||
|
|
|
@ -8,20 +8,26 @@ import { Readable } from 'stream';
|
|||
|
||||
import { LegacyAPICaller } from 'kibana/server';
|
||||
|
||||
import { createListIfItDoesNotExist } from '../lists/create_list_if_it_does_not_exist';
|
||||
import {
|
||||
DeserializerOrUndefined,
|
||||
ListIdOrUndefined,
|
||||
ListSchema,
|
||||
MetaOrUndefined,
|
||||
SerializerOrUndefined,
|
||||
Type,
|
||||
} from '../../../common/schemas';
|
||||
import { ConfigType } from '../../config';
|
||||
|
||||
import { BufferLines } from './buffer_lines';
|
||||
import { createListItemsBulk } from './create_list_items_bulk';
|
||||
|
||||
export interface ImportListItemsToStreamOptions {
|
||||
listId: ListIdOrUndefined;
|
||||
config: ConfigType;
|
||||
listIndex: string;
|
||||
deserializer: DeserializerOrUndefined;
|
||||
serializer: SerializerOrUndefined;
|
||||
listId: string;
|
||||
stream: Readable;
|
||||
callCluster: LegacyAPICaller;
|
||||
listItemIndex: string;
|
||||
|
@ -31,34 +37,72 @@ export interface ImportListItemsToStreamOptions {
|
|||
}
|
||||
|
||||
export const importListItemsToStream = ({
|
||||
config,
|
||||
deserializer,
|
||||
serializer,
|
||||
listId,
|
||||
stream,
|
||||
callCluster,
|
||||
listItemIndex,
|
||||
listIndex,
|
||||
type,
|
||||
user,
|
||||
meta,
|
||||
}: ImportListItemsToStreamOptions): Promise<void> => {
|
||||
return new Promise<void>((resolve) => {
|
||||
const readBuffer = new BufferLines({ input: stream });
|
||||
}: ImportListItemsToStreamOptions): Promise<ListSchema | null> => {
|
||||
return new Promise<ListSchema | null>((resolve) => {
|
||||
const readBuffer = new BufferLines({ bufferSize: config.importBufferSize, input: stream });
|
||||
let fileName: string | undefined;
|
||||
let list: ListSchema | null = null;
|
||||
readBuffer.on('fileName', async (fileNameEmitted: string) => {
|
||||
readBuffer.pause();
|
||||
fileName = fileNameEmitted;
|
||||
if (listId == null) {
|
||||
list = await createListIfItDoesNotExist({
|
||||
callCluster,
|
||||
description: `File uploaded from file system of ${fileNameEmitted}`,
|
||||
deserializer,
|
||||
id: fileNameEmitted,
|
||||
listIndex,
|
||||
meta,
|
||||
name: fileNameEmitted,
|
||||
serializer,
|
||||
type,
|
||||
user,
|
||||
});
|
||||
}
|
||||
readBuffer.resume();
|
||||
});
|
||||
|
||||
readBuffer.on('lines', async (lines: string[]) => {
|
||||
await writeBufferToItems({
|
||||
buffer: lines,
|
||||
callCluster,
|
||||
deserializer,
|
||||
listId,
|
||||
listItemIndex,
|
||||
meta,
|
||||
serializer,
|
||||
type,
|
||||
user,
|
||||
});
|
||||
if (listId != null) {
|
||||
await writeBufferToItems({
|
||||
buffer: lines,
|
||||
callCluster,
|
||||
deserializer,
|
||||
listId,
|
||||
listItemIndex,
|
||||
meta,
|
||||
serializer,
|
||||
type,
|
||||
user,
|
||||
});
|
||||
} else if (fileName != null) {
|
||||
await writeBufferToItems({
|
||||
buffer: lines,
|
||||
callCluster,
|
||||
deserializer,
|
||||
listId: fileName,
|
||||
listItemIndex,
|
||||
meta,
|
||||
serializer,
|
||||
type,
|
||||
user,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
readBuffer.on('close', () => {
|
||||
resolve();
|
||||
resolve(list);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { LegacyAPICaller } from 'kibana/server';
|
||||
|
||||
import {
|
||||
Description,
|
||||
DeserializerOrUndefined,
|
||||
Id,
|
||||
ListSchema,
|
||||
MetaOrUndefined,
|
||||
Name,
|
||||
SerializerOrUndefined,
|
||||
Type,
|
||||
} from '../../../common/schemas';
|
||||
|
||||
import { getList } from './get_list';
|
||||
import { createList } from './create_list';
|
||||
|
||||
export interface CreateListIfItDoesNotExistOptions {
|
||||
id: Id;
|
||||
type: Type;
|
||||
name: Name;
|
||||
deserializer: DeserializerOrUndefined;
|
||||
serializer: SerializerOrUndefined;
|
||||
description: Description;
|
||||
callCluster: LegacyAPICaller;
|
||||
listIndex: string;
|
||||
user: string;
|
||||
meta: MetaOrUndefined;
|
||||
dateNow?: string;
|
||||
tieBreaker?: string;
|
||||
}
|
||||
|
||||
export const createListIfItDoesNotExist = async ({
|
||||
id,
|
||||
name,
|
||||
type,
|
||||
description,
|
||||
deserializer,
|
||||
callCluster,
|
||||
listIndex,
|
||||
user,
|
||||
meta,
|
||||
serializer,
|
||||
dateNow,
|
||||
tieBreaker,
|
||||
}: CreateListIfItDoesNotExistOptions): Promise<ListSchema> => {
|
||||
const list = await getList({ callCluster, id, listIndex });
|
||||
if (list == null) {
|
||||
return createList({
|
||||
callCluster,
|
||||
dateNow,
|
||||
description,
|
||||
deserializer,
|
||||
id,
|
||||
listIndex,
|
||||
meta,
|
||||
name,
|
||||
serializer,
|
||||
tieBreaker,
|
||||
type,
|
||||
user,
|
||||
});
|
||||
} else {
|
||||
return list;
|
||||
}
|
||||
};
|
|
@ -9,7 +9,12 @@ import { getFoundListSchemaMock } from '../../../common/schemas/response/found_l
|
|||
import { getListItemResponseMock } from '../../../common/schemas/response/list_item_schema.mock';
|
||||
import { getListResponseMock } from '../../../common/schemas/response/list_schema.mock';
|
||||
import { getCallClusterMock } from '../../../common/get_call_cluster.mock';
|
||||
import { LIST_INDEX, LIST_ITEM_INDEX } from '../../../common/constants.mock';
|
||||
import {
|
||||
IMPORT_BUFFER_SIZE,
|
||||
LIST_INDEX,
|
||||
LIST_ITEM_INDEX,
|
||||
MAX_IMPORT_PAYLOAD_BYTES,
|
||||
} from '../../../common/constants.mock';
|
||||
|
||||
import { ListClient } from './list_client';
|
||||
|
||||
|
@ -59,8 +64,10 @@ export const getListClientMock = (): ListClient => {
|
|||
callCluster: getCallClusterMock(),
|
||||
config: {
|
||||
enabled: true,
|
||||
importBufferSize: IMPORT_BUFFER_SIZE,
|
||||
listIndex: LIST_INDEX,
|
||||
listItemIndex: LIST_ITEM_INDEX,
|
||||
maxImportPayloadBytes: MAX_IMPORT_PAYLOAD_BYTES,
|
||||
},
|
||||
spaceId: 'default',
|
||||
user: 'elastic',
|
||||
|
|
|
@ -70,6 +70,7 @@ import {
|
|||
UpdateListItemOptions,
|
||||
UpdateListOptions,
|
||||
} from './list_client_types';
|
||||
import { createListIfItDoesNotExist } from './create_list_if_it_does_not_exist';
|
||||
|
||||
export class ListClient {
|
||||
private readonly spaceId: string;
|
||||
|
@ -140,12 +141,20 @@ export class ListClient {
|
|||
type,
|
||||
meta,
|
||||
}: CreateListIfItDoesNotExistOptions): Promise<ListSchema> => {
|
||||
const list = await this.getList({ id });
|
||||
if (list == null) {
|
||||
return this.createList({ description, deserializer, id, meta, name, serializer, type });
|
||||
} else {
|
||||
return list;
|
||||
}
|
||||
const { callCluster, user } = this;
|
||||
const listIndex = this.getListIndex();
|
||||
return createListIfItDoesNotExist({
|
||||
callCluster,
|
||||
description,
|
||||
deserializer,
|
||||
id,
|
||||
listIndex,
|
||||
meta,
|
||||
name,
|
||||
serializer,
|
||||
type,
|
||||
user,
|
||||
});
|
||||
};
|
||||
|
||||
public getListIndexExists = async (): Promise<boolean> => {
|
||||
|
@ -325,13 +334,16 @@ export class ListClient {
|
|||
listId,
|
||||
stream,
|
||||
meta,
|
||||
}: ImportListItemsToStreamOptions): Promise<void> => {
|
||||
const { callCluster, user } = this;
|
||||
}: ImportListItemsToStreamOptions): Promise<ListSchema | null> => {
|
||||
const { callCluster, user, config } = this;
|
||||
const listItemIndex = this.getListItemIndex();
|
||||
const listIndex = this.getListIndex();
|
||||
return importListItemsToStream({
|
||||
callCluster,
|
||||
config,
|
||||
deserializer,
|
||||
listId,
|
||||
listIndex,
|
||||
listItemIndex,
|
||||
meta,
|
||||
serializer,
|
||||
|
|
|
@ -16,6 +16,7 @@ import {
|
|||
Id,
|
||||
IdOrUndefined,
|
||||
ListId,
|
||||
ListIdOrUndefined,
|
||||
MetaOrUndefined,
|
||||
Name,
|
||||
NameOrUndefined,
|
||||
|
@ -86,9 +87,9 @@ export interface ExportListItemsToStreamOptions {
|
|||
}
|
||||
|
||||
export interface ImportListItemsToStreamOptions {
|
||||
listId: ListIdOrUndefined;
|
||||
deserializer: DeserializerOrUndefined;
|
||||
serializer: SerializerOrUndefined;
|
||||
listId: string;
|
||||
type: Type;
|
||||
stream: Readable;
|
||||
meta: MetaOrUndefined;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue