[Files] Limit concurrent uploads (server-side) (#139842)

* updated upload logic to respect aborts...

* added abort$ test

* create kbn-semaphore package

* remove duplicate sempahore implementation

* make sure that scheduling happens async

* limit concurrent upload requests

* fix copy-pasta error

* await the status update to avoid a race condition

* remove setImmediate

* move kbn-sempahore to kbn-std

* remove gettersetter pattern

* PR feedback about try catch block and made error message more clear

* typo
This commit is contained in:
Jean-Louis Leysens 2022-09-05 12:20:26 +02:00 committed by GitHub
parent 96a6d85beb
commit 01ecbd48f3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 202 additions and 50 deletions

View file

@ -27,3 +27,4 @@ export {
asyncForEach,
asyncForEachWithLimit,
} from './src/iteration';
export { Semaphore } from './src/semaphore';

View file

@ -1,12 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { TestScheduler } from 'rxjs/testing';
import { Semaphore } from '.';
import { Semaphore } from './semaphore';
describe('Semaphore', () => {
let testScheduler: TestScheduler;

View file

@ -1,8 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Observable } from 'rxjs';

View file

@ -5,6 +5,7 @@
* 2.0.
*/
import type { SavedObject } from '@kbn/core/server';
import type { Observable } from 'rxjs';
import type { Readable } from 'stream';
import type { ES_FIXED_SIZE_INDEX_BLOB_STORE } from './constants';
@ -353,8 +354,9 @@ export interface File<Meta = unknown> {
* Stream file content to storage.
*
* @param content - The content to stream to storage.
* @param abort$ - An observable that can be used to abort the upload at any time.
*/
uploadContent(content: Readable): Promise<File<Meta>>;
uploadContent(content: Readable, abort$?: Observable<unknown>): Promise<File<Meta>>;
/**
* Stream file content from storage.

View file

@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Readable } from 'stream';
import { promisify } from 'util';
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { Semaphore } from '@kbn/std';
import { ElasticsearchBlobStorageClient } from './es';
const setImmediate = promisify(global.setImmediate);
describe('ElasticsearchBlobStorageClient', () => {
let esClient: ElasticsearchClient;
let blobStoreClient: ElasticsearchBlobStorageClient;
let semaphore: Semaphore;
beforeEach(() => {
semaphore = new Semaphore(1);
esClient = elasticsearchServiceMock.createElasticsearchClient();
blobStoreClient = new ElasticsearchBlobStorageClient(
esClient,
undefined,
undefined,
loggingSystemMock.createLogger(),
semaphore
);
});
test('limits max concurrent uploads', async () => {
const acquireSpy = jest.spyOn(semaphore, 'acquire');
(esClient.index as jest.Mock).mockImplementation(() => {
return new Promise((res, rej) => setTimeout(() => rej('failed'), 100));
});
const [p1, p2, ...rest] = [
blobStoreClient.upload(Readable.from(['test'])).catch(() => {}),
blobStoreClient.upload(Readable.from(['test'])).catch(() => {}),
blobStoreClient.upload(Readable.from(['test'])).catch(() => {}),
blobStoreClient.upload(Readable.from(['test'])).catch(() => {}),
];
await setImmediate();
expect(acquireSpy).toHaveBeenCalledTimes(4);
await p1;
expect(esClient.index).toHaveBeenCalledTimes(1);
await p2;
expect(esClient.index).toHaveBeenCalledTimes(2);
await Promise.all(rest);
expect(esClient.index).toHaveBeenCalledTimes(4);
});
});

View file

@ -9,9 +9,11 @@ import assert from 'assert';
import { once } from 'lodash';
import { errors } from '@elastic/elasticsearch';
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import { Semaphore } from '@kbn/std';
import { Readable, Transform } from 'stream';
import { pipeline } from 'stream/promises';
import { promisify } from 'util';
import { lastValueFrom, defer } from 'rxjs';
import type { BlobStorageClient } from '../../types';
import type { ReadableContentStream } from './content_stream';
import { getReadableContentStream, getWritableContentStream } from './content_stream';
@ -27,16 +29,31 @@ export const BLOB_STORAGE_SYSTEM_INDEX_NAME = '.kibana_blob_storage';
export const MAX_BLOB_STORE_SIZE_BYTES = 50 * 1024 * 1024 * 1024; // 50 GiB
export class ElasticsearchBlobStorageClient implements BlobStorageClient {
private static defaultSemaphore: Semaphore;
/**
* Call this function once to globally set a concurrent upload limit for
* all {@link ElasticsearchBlobStorageClient} instances.
*/
public static configureConcurrentUpload(capacity: number) {
this.defaultSemaphore = new Semaphore(capacity);
}
constructor(
private readonly esClient: ElasticsearchClient,
private readonly index: string = BLOB_STORAGE_SYSTEM_INDEX_NAME,
private readonly chunkSize: undefined | string,
private readonly logger: Logger
private readonly logger: Logger,
/**
* Override the default concurrent upload limit by passing in a different
* semaphore
*/
private readonly uploadSemaphore = ElasticsearchBlobStorageClient.defaultSemaphore
) {
assert(
this.index.startsWith('.kibana'),
`Elasticsearch blob store index name must start with ".kibana", got ${this.index}.`
);
assert(this.uploadSemaphore, `No default semaphore provided and no semaphore was passed in.`);
}
/**
@ -81,28 +98,32 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient {
): Promise<{ id: string; size: number }> {
await this.createIndexIfNotExists();
try {
const dest = getWritableContentStream({
id,
client: this.esClient,
index: this.index,
logger: this.logger.get('content-stream-upload'),
parameters: {
maxChunkSize: this.chunkSize,
},
});
await pipeline.apply(null, [src, ...(transforms ?? []), dest] as unknown as Parameters<
typeof pipeline
>);
const processUpload = async () => {
try {
const dest = getWritableContentStream({
id,
client: this.esClient,
index: this.index,
logger: this.logger.get('content-stream-upload'),
parameters: {
maxChunkSize: this.chunkSize,
},
});
await pipeline.apply(null, [src, ...(transforms ?? []), dest] as unknown as Parameters<
typeof pipeline
>);
return {
id: dest.getContentReferenceId()!,
size: dest.getBytesWritten(),
};
} catch (e) {
this.logger.error(`Could not write chunks to Elasticsearch: ${e}`);
throw e;
}
return {
id: dest.getContentReferenceId()!,
size: dest.getBytesWritten(),
};
} catch (e) {
this.logger.error(`Could not write chunks to Elasticsearch for id ${id}: ${e}`);
throw e;
}
};
return lastValueFrom(defer(processUpload).pipe(this.uploadSemaphore.acquire()));
}
private getReadableContentStream(id: string, size?: number): ReadableContentStream {

View file

@ -24,6 +24,7 @@ describe('Elasticsearch blob storage', () => {
const sandbox = sinon.createSandbox();
beforeAll(async () => {
ElasticsearchBlobStorageClient.configureConcurrentUpload(Infinity);
const { startES, startKibana } = createTestServers({ adjustTimeout: jest.setTimeout });
manageES = await startES();
manageKbn = await startKibana();

View file

@ -4,7 +4,6 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import { BlobStorageSettings, ES_FIXED_SIZE_INDEX_BLOB_STORE } from '../../common';
import { BlobStorageClient } from './types';
@ -16,7 +15,14 @@ interface ElasticsearchBlobStorageSettings {
}
export class BlobStorageService {
constructor(private readonly esClient: ElasticsearchClient, private readonly logger: Logger) {}
/**
* The number of uploads per Kibana instance that can be running simultaneously
*/
private readonly concurrentUploadsToES = 20;
constructor(private readonly esClient: ElasticsearchClient, private readonly logger: Logger) {
ElasticsearchBlobStorageClient.configureConcurrentUpload(this.concurrentUploadsToES);
}
private createESBlobStorage({
index,

View file

@ -18,3 +18,4 @@ export class ContentAlreadyUploadedError extends FileError {}
export class NoDownloadAvailableError extends FileError {}
export class UploadInProgressError extends FileError {}
export class AlreadyDeletedError extends FileError {}
export class AbortedUploadError extends FileError {}

View file

@ -4,6 +4,7 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { of } from 'rxjs';
import type { ElasticsearchClient, ISavedObjectsRepository } from '@kbn/core/server';
import { createSandbox } from 'sinon';
import {
@ -92,4 +93,20 @@ describe('File', () => {
await file.uploadContent(Readable.from(['test']));
expect(file.data.status).toBe('READY');
});
it('sets file status and deletes content if aborted', async () => {
const createBlobSpy = sandbox.spy(blobStorageService, 'createBlobStorageClient');
const fileSO = { attributes: { Status: 'AWAITING_UPLOAD' } };
(soClient.create as jest.Mock).mockResolvedValue(fileSO);
(soClient.update as jest.Mock).mockResolvedValue(fileSO);
const file = await fileService.createFile({ name: 'test', fileKind });
const [{ returnValue: blobStore }] = createBlobSpy.getCalls();
const blobStoreSpy = sandbox.spy(blobStore, 'delete');
const abort$ = of('boom!');
await expect(file.uploadContent(Readable.from(['test']), abort$)).rejects.toThrow(/Abort/);
await setImmediate();
expect(file.data.status).toBe('UPLOAD_ERROR');
expect(blobStoreSpy.calledOnce).toBe(true);
});
});

View file

@ -7,15 +7,27 @@
import { Logger } from '@kbn/core/server';
import { Readable } from 'stream';
import {
map,
from,
race,
defer,
NEVER,
mergeMap,
catchError,
Observable,
lastValueFrom,
} from 'rxjs';
import type { FileShareJSON, FileShareJSONWithToken } from '../../common/types';
import type { File as IFile, UpdatableFileMetadata, FileJSON } from '../../common';
import { fileAttributesReducer, Action } from './file_attributes_reducer';
import type { FileClientImpl } from '../file_client/file_client';
import {
AbortedUploadError,
AlreadyDeletedError,
ContentAlreadyUploadedError,
NoDownloadAvailableError,
UploadInProgressError,
NoDownloadAvailableError,
ContentAlreadyUploadedError,
} from './errors';
/**
@ -57,7 +69,14 @@ export class File<M = unknown> implements IFile {
return this;
}
public async uploadContent(content: Readable): Promise<IFile<M>> {
private upload(content: Readable): Observable<{ size: number }> {
return defer(() => this.fileClient.upload(this.id, content));
}
public async uploadContent(
content: Readable,
abort$: Observable<unknown> = NEVER
): Promise<IFile<M>> {
if (this.uploadInProgress()) {
throw new UploadInProgressError('Upload already in progress.');
}
@ -65,22 +84,37 @@ export class File<M = unknown> implements IFile {
throw new ContentAlreadyUploadedError('Already uploaded file content.');
}
this.logger.debug(`Uploading file [id = ${this.id}][name = ${this.data.name}].`);
await this.updateFileState({
action: 'uploading',
});
try {
const { size } = await this.fileClient.upload(this.id, content);
await this.updateFileState({
action: 'uploaded',
payload: { size },
});
return this;
} catch (e) {
await this.updateFileState({ action: 'uploadError' });
this.fileClient.deleteContent(this.id).catch(() => {}); // Best effort to remove any uploaded content
throw e;
}
await lastValueFrom(
from(this.updateFileState({ action: 'uploading' })).pipe(
mergeMap(() =>
race(
this.upload(content),
abort$.pipe(
map(() => {
throw new AbortedUploadError(`Aborted upload of ${this.id}!`);
})
)
)
),
mergeMap(({ size }) => {
return this.updateFileState({ action: 'uploaded', payload: { size } });
}),
catchError(async (e) => {
try {
await this.updateFileState({ action: 'uploadError' });
} catch (updateError) {
this.logger.error(
`Could not update file ${this.id} after upload error (${e.message}). Update failed with: ${updateError.message}. This file may be in an inconsistent state.`
);
}
this.fileClient.deleteContent(this.id).catch(() => {});
throw e;
})
)
);
return this;
}
public downloadContent(): Promise<Readable> {

View file

@ -6,6 +6,7 @@
*/
import { schema, TypeOf } from '@kbn/config-schema';
import { ReplaySubject } from 'rxjs';
import type { Ensure } from '@kbn/utility-types';
import { Readable } from 'stream';
import type { FileKind } from '../../../common/types';
@ -32,6 +33,12 @@ export const handler: FileKindsRequestHandler<Params, unknown, Body> = async (
req,
res
) => {
// Ensure that we are listening to the abort stream as early as possible.
// In local testing I found that there is a chance for us to miss the abort event
// if we subscribe too late.
const abort$ = new ReplaySubject();
const sub = req.events.aborted$.subscribe(abort$);
const { fileService } = await files;
const {
body: stream,
@ -40,15 +47,20 @@ export const handler: FileKindsRequestHandler<Params, unknown, Body> = async (
const { error, result: file } = await getById(fileService.asCurrentUser(), id, fileKind);
if (error) return error;
try {
await file.uploadContent(stream as Readable);
await file.uploadContent(stream as Readable, abort$);
} catch (e) {
if (
e instanceof fileErrors.ContentAlreadyUploadedError ||
e instanceof fileErrors.UploadInProgressError
) {
return res.badRequest({ body: { message: e.message } });
} else if (e instanceof fileErrors.AbortedUploadError) {
fileService.logger.error(e);
return res.customError({ body: { message: e.message }, statusCode: 499 });
}
throw e;
} finally {
sub.unsubscribe();
}
const body: Response = { ok: true, size: file.data.size! };
return res.ok({ body });

View file

@ -9,6 +9,7 @@ import type { CloudSetup } from '@kbn/cloud-plugin/server';
import type { HttpServiceSetup, KibanaRequest, Logger, PackageInfo } from '@kbn/core/server';
import type { ExpressionAstExpression } from '@kbn/expressions-plugin/common';
import type { Optional } from '@kbn/utility-types';
import { Semaphore } from '@kbn/std';
import ipaddr from 'ipaddr.js';
import { defaultsDeep, sum } from 'lodash';
import { from, Observable, of, throwError } from 'rxjs';
@ -46,7 +47,6 @@ import { createLayout, Layout } from '../layouts';
import { EventLogger, Transactions } from './event_logger';
import type { ScreenshotObservableOptions, ScreenshotObservableResult } from './observable';
import { ScreenshotObservableHandler, UrlOrUrlWithContext } from './observable';
import { Semaphore } from './semaphore';
export type { ScreenshotObservableResult, UrlOrUrlWithContext } from './observable';