[Files] Add support for using ES Data Streams for files (metadata + blob storage) (#160437)

## Summary

Adds support to the Files plugin for DataStreams. Changes include:

- use `op_type` of `create` on all document creations
- When the `indexIsAlias` option is used:
- check for index existence will not be done, thus Index will NOT be
automatically created if it does not yet exist
- `@timestamp` top-level document property will be written for the
metadata as well as each file chunk
- Fixes `ElasticsearchBlobStorageClient.createIndexIfNotExists()` to
ensure it is executed no more than once per index name
- added `.catch(wrapErrorAndReThrow)` to several `esClient` calls in
order to get better stack traces for failures


### Checklist

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Paul Tavares 2023-06-30 09:25:22 -04:00 committed by GitHub
parent 44863837a8
commit a458b99523
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 411 additions and 94 deletions

View file

@ -15,6 +15,8 @@ import { ContentStream, ContentStreamEncoding, ContentStreamParameters } from '.
import type { GetResponse } from '@elastic/elasticsearch/lib/api/types';
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { FileDocument } from '../../../../file_client/file_metadata_client/adapters/es_index';
import * as cborx from 'cbor-x';
import { IndexRequest } from '@elastic/elasticsearch/lib/api/types';
describe('ContentStream', () => {
let client: ReturnType<typeof elasticsearchServiceMock.createElasticsearchClient>;
@ -282,12 +284,14 @@ describe('ContentStream', () => {
});
it('should emit an error event', async () => {
client.index.mockRejectedValueOnce('some error');
client.index.mockRejectedValueOnce(new Error('some error'));
stream.end('data');
const error = await new Promise((resolve) => stream.once('error', resolve));
expect(error).toBe('some error');
expect((error as Error).toString()).toEqual(
'FilesPluginError: ContentStream.indexChunk(): some error'
);
});
it('should remove all previous chunks before writing', async () => {
@ -405,5 +409,15 @@ describe('ContentStream', () => {
expect(deleteRequest).toHaveProperty('query.bool.must.match.bid', 'something');
});
it('should write @timestamp if `indexIsAlias` is true', async () => {
stream = new ContentStream(client, undefined, 'somewhere', logger, undefined, true);
stream.end('some data');
await new Promise((resolve) => stream.once('finish', resolve));
const docBuffer = (client.index.mock.calls[0][0] as IndexRequest).document as Buffer;
const docData = cborx.decode(docBuffer);
expect(docData).toHaveProperty('@timestamp');
});
});
});

View file

@ -16,6 +16,7 @@ import { Duplex, Writable, Readable } from 'stream';
import { GetResponse } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { inspect } from 'util';
import { wrapErrorAndReThrow } from '../../../../file_client/utils';
import type { FileChunkDocument } from '../mappings';
type Callback = (error?: Error) => void;
@ -238,27 +239,29 @@ export class ContentStream extends Duplex {
}
private async indexChunk({ bid, data, id, index }: IndexRequestParams, last?: true) {
await this.client.index(
{
id,
index,
document: cborx.encode(
last
? {
data,
bid,
last,
}
: { data, bid }
),
},
{
headers: {
'content-type': 'application/cbor',
accept: 'application/json',
await this.client
.index(
{
id,
index,
op_type: 'create',
document: cborx.encode({
data,
bid,
// Mark it as last?
...(last ? { last } : {}),
// Add `@timestamp` for Index Alias/DS?
...(this.indexIsAlias ? { '@timestamp': new Date().toISOString() } : {}),
}),
},
}
);
{
headers: {
'content-type': 'application/cbor',
accept: 'application/json',
},
}
)
.catch(wrapErrorAndReThrow.withMessagePrefix('ContentStream.indexChunk(): '));
}
/**

View file

@ -8,35 +8,51 @@
import { Readable } from 'stream';
import { promisify } from 'util';
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { Semaphore } from '@kbn/std';
import { ElasticsearchBlobStorageClient } from './es';
import { errors } from '@elastic/elasticsearch';
const setImmediate = promisify(global.setImmediate);
describe('ElasticsearchBlobStorageClient', () => {
let esClient: ElasticsearchClient;
let blobStoreClient: ElasticsearchBlobStorageClient;
let esClient: ReturnType<typeof elasticsearchServiceMock.createElasticsearchClient>;
let semaphore: Semaphore;
let logger: ReturnType<typeof loggingSystemMock.createLogger>;
// Exposed `clearCache()` which resets the cache for the memoized `createIndexIfNotExists()` method
class ElasticsearchBlobStorageClientWithCacheClear extends ElasticsearchBlobStorageClient {
static clearCache() {
// @ts-expect-error TS2722: Cannot invoke an object which is possibly 'undefined' (??)
this.createIndexIfNotExists.cache.clear();
}
}
const createBlobStoreClient = (index?: string, indexIsAlias: boolean = false) => {
ElasticsearchBlobStorageClientWithCacheClear.clearCache();
return new ElasticsearchBlobStorageClientWithCacheClear(
esClient,
index,
undefined,
logger,
semaphore,
indexIsAlias
);
};
beforeEach(() => {
semaphore = new Semaphore(1);
logger = loggingSystemMock.createLogger();
esClient = elasticsearchServiceMock.createElasticsearchClient();
blobStoreClient = new ElasticsearchBlobStorageClient(
esClient,
undefined,
undefined,
loggingSystemMock.createLogger(),
semaphore
);
});
test('limits max concurrent uploads', async () => {
const blobStoreClient = createBlobStoreClient();
const acquireSpy = jest.spyOn(semaphore, 'acquire');
(esClient.index as jest.Mock).mockImplementation(() => {
esClient.index.mockImplementation(() => {
return new Promise((res, rej) => setTimeout(() => rej('failed'), 100));
});
const [p1, p2, ...rest] = [
@ -54,4 +70,59 @@ describe('ElasticsearchBlobStorageClient', () => {
await Promise.all(rest);
expect(esClient.index).toHaveBeenCalledTimes(4);
});
describe('.createIndexIfNotExists()', () => {
let data: Readable;
beforeEach(() => {
data = Readable.from(['test']);
});
it('should create index if it does not exist', async () => {
esClient.indices.exists.mockResolvedValue(false);
const blobStoreClient = await createBlobStoreClient('foo1');
await blobStoreClient.upload(data);
expect(logger.info).toHaveBeenCalledWith(
'Creating [foo1] index for Elasticsearch blob store.'
);
// Calling a second time should do nothing
logger.info.mockClear();
await blobStoreClient.upload(data);
expect(logger.info).not.toHaveBeenCalledWith(
'Creating [foo1] index for Elasticsearch blob store.'
);
});
it('should not create index if it already exists', async () => {
esClient.indices.exists.mockResolvedValue(true);
await createBlobStoreClient('foo1').upload(data);
expect(logger.debug).toHaveBeenCalledWith('[foo1] already exists. Nothing to do');
});
it('should not create index if `indexIsAlias` is `true`', async () => {
await createBlobStoreClient('foo1', true).upload(data);
expect(logger.debug).toHaveBeenCalledWith(
'No need to create index [foo1] as it is an Alias or DS.'
);
});
it('should not reject if it is unable to create the index (best effort)', async () => {
esClient.indices.exists.mockResolvedValue(false);
esClient.indices.create.mockRejectedValue(
new errors.ResponseError({
statusCode: 400,
} as ConstructorParameters<typeof errors.ResponseError>[0])
);
await createBlobStoreClient('foo1', false).upload(data);
expect(logger.warn).toHaveBeenCalledWith(
'Unable to create blob storage index [foo1], it may have been created already.'
);
});
});
});

View file

@ -7,7 +7,6 @@
*/
import assert from 'assert';
import { once } from 'lodash';
import { errors } from '@elastic/elasticsearch';
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import { Semaphore } from '@kbn/std';
@ -16,6 +15,7 @@ import { pipeline } from 'stream/promises';
import { promisify } from 'util';
import { lastValueFrom, defer } from 'rxjs';
import { PerformanceMetricEvent, reportPerformanceMetricEvent } from '@kbn/ebt-tools';
import { memoize } from 'lodash';
import { FilesPlugin } from '../../../plugin';
import { FILE_UPLOAD_PERFORMANCE_EVENT_NAME } from '../../../performance';
import type { BlobStorageClient } from '../../types';
@ -65,23 +65,29 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient {
}
/**
* This function acts as a singleton i.t.o. execution: it can only be called once.
* Subsequent calls should not re-execute it.
*
* There is a known issue where calling this function simultaneously can result
* in a race condition where one of the calls will fail because the index is already
* being created. This is only an issue for the very first time the index is being
* created.
* This function acts as a singleton i.t.o. execution: it can only be called once per index.
* Subsequent calls for the same index should not re-execute it.
*/
private static createIndexIfNotExists = once(
async (index: string, esClient: ElasticsearchClient, logger: Logger): Promise<void> => {
protected static createIndexIfNotExists = memoize(
async (
index: string,
esClient: ElasticsearchClient,
logger: Logger,
indexIsAlias: boolean
): Promise<void> => {
// We don't attempt to create the index if it is an Alias/DS
if (indexIsAlias) {
logger.debug(`No need to create index [${index}] as it is an Alias or DS.`);
return;
}
try {
if (await esClient.indices.exists({ index })) {
logger.debug(`${index} already exists.`);
logger.debug(`[${index}] already exists. Nothing to do`);
return;
}
logger.info(`Creating ${index} for Elasticsearch blob store.`);
logger.info(`Creating [${index}] index for Elasticsearch blob store.`);
await esClient.indices.create({
index,
@ -96,7 +102,9 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient {
});
} catch (e) {
if (e instanceof errors.ResponseError && e.statusCode === 400) {
logger.warn('Unable to create blob storage index, it may have been created already.');
logger.warn(
`Unable to create blob storage index [${index}], it may have been created already.`
);
}
// best effort
}
@ -109,7 +117,8 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient {
await ElasticsearchBlobStorageClient.createIndexIfNotExists(
this.index,
this.esClient,
this.logger
this.logger,
this.indexIsAlias
);
const processUpload = async () => {
@ -123,6 +132,7 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient {
parameters: {
maxChunkSize: this.chunkSize,
},
indexIsAlias: this.indexIsAlias,
});
const start = performance.now();
@ -183,6 +193,7 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient {
client: this.esClient,
index: this.index,
logger: this.logger.get('content-stream-delete'),
indexIsAlias: this.indexIsAlias,
});
/** @note Overwriting existing content with an empty buffer to remove all the chunks. */
await promisify(dest.end.bind(dest, '', 'utf8'))();

View file

@ -29,6 +29,7 @@ import { FileMetadataClient } from '../file_client';
import { SavedObjectsFileMetadataClient } from '../file_client/file_metadata_client/adapters/saved_objects';
import { File as IFile } from '../../common';
import { createFileHashTransform } from '..';
import { FilesPluginError } from '../file_client/utils';
const setImmediate = promisify(global.setImmediate);
@ -82,7 +83,9 @@ describe('File', () => {
const [{ returnValue: blobStore }] = createBlobSpy.getCalls();
const blobStoreSpy = sandbox.spy(blobStore, 'delete');
expect(blobStoreSpy.calledOnce).toBe(false);
await expect(file.uploadContent(Readable.from(['test']))).rejects.toThrow(new Error('test'));
await expect(file.uploadContent(Readable.from(['test']))).rejects.toThrow(
new FilesPluginError('ContentStream.indexChunk(): test')
);
await setImmediate();
expect(blobStoreSpy.calledOnce).toBe(true);
});

View file

@ -31,10 +31,11 @@ export interface CreateEsFileClientArgs {
*/
elasticsearchClient: ElasticsearchClient;
/**
* Treat the indices provided as Aliases. If set to true, ES `search()` will be used to
* retrieve the file info and content instead of `get()`. This is needed to ensure the
* content can be retrieved in cases where an index may have rolled over (ES `get()`
* needs a "real" index)
* Treat the indices provided as Aliases/Datastreams.
* When set to `true`:
* - additional ES calls will be made to get the real backing indexes
* - will not check if indexes exists and attempt to create them if not
* - an additional `@timestamp` property will be written to all documents (at root of document)
*/
indexIsAlias?: boolean;
/**

View file

@ -0,0 +1,94 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { elasticsearchServiceMock } from '@kbn/core-elasticsearch-server-mocks';
import { Logger } from '@kbn/logging';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import { EsIndexFilesMetadataClient } from '../..';
import { FileMetadata } from '@kbn/shared-ux-file-types';
import { estypes } from '@elastic/elasticsearch';
describe('EsIndexFilesMetadataClient', () => {
let esClient: ReturnType<typeof elasticsearchServiceMock.createElasticsearchClient>;
let logger: Logger;
const generateMetadata = (): FileMetadata => {
return {
created: '2023-06-26T17:33:35.968Z',
Updated: '2023-06-26T17:33:35.968Z',
Status: 'READY',
name: 'lol.gif',
mime_type: 'image/gif',
extension: 'gif',
FileKind: 'none',
size: 134751,
};
};
beforeEach(() => {
esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
logger = loggingSystemMock.createLogger();
});
describe('and `indexIsAlias` prop is `true`', () => {
let metaClient: EsIndexFilesMetadataClient;
beforeEach(() => {
metaClient = new EsIndexFilesMetadataClient('foo', esClient, logger, true);
});
it('should NOT create index', async () => {
esClient.index.mockResolvedValue({ _id: '123' } as estypes.WriteResponseBase);
await metaClient.create({ id: '123', metadata: generateMetadata() });
expect(logger.debug).toHaveBeenCalledWith(
'No need to create index [foo] as it is an Alias or DS.'
);
});
it('should retrieve backing index on update', async () => {
// For `.getBackingIndex()`
esClient.search.mockResolvedValueOnce({
hits: { hits: [{ _index: 'foo-00001' } as estypes.SearchHit] },
} as estypes.SearchResponse);
// For `.get()`
esClient.search.mockResolvedValueOnce({
hits: { hits: [{ _source: { file: generateMetadata() } } as estypes.SearchHit] },
} as estypes.SearchResponse);
await metaClient.update({ id: '123', metadata: generateMetadata() });
expect(esClient.search).toHaveBeenCalledWith({
body: {
_source: false,
query: {
term: {
_id: '123',
},
},
size: 1,
},
index: 'foo',
});
expect(esClient.update).toHaveBeenCalledWith(expect.objectContaining({ index: 'foo-00001' }));
});
it('should write @timestamp on create', async () => {
esClient.index.mockResolvedValue({ _id: '123' } as estypes.WriteResponseBase);
await metaClient.create({ id: '123', metadata: generateMetadata() });
expect(esClient.index).toHaveBeenCalledWith(
expect.objectContaining({
document: expect.objectContaining({
'@timestamp': expect.any(String),
}),
})
);
});
});
});

View file

@ -14,6 +14,7 @@ import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { MappingProperty, SearchTotalHits } from '@elastic/elasticsearch/lib/api/types';
import pLimit from 'p-limit';
import { wrapErrorAndReThrow } from '../../utils';
import type { FilesMetrics, FileMetadata, Pagination } from '../../../../common';
import type { FindFileArgs } from '../../../file_service';
import type {
@ -41,6 +42,8 @@ const fileMappings: MappingProperty = {
export interface FileDocument<M = unknown> {
file: FileMetadata<M>;
/** Written only when `indexIsAlias` is `true` */
'@timestamp'?: string;
}
export class EsIndexFilesMetadataClient<M = unknown> implements FileMetadataClient {
@ -52,32 +55,91 @@ export class EsIndexFilesMetadataClient<M = unknown> implements FileMetadataClie
) {}
private createIfNotExists = once(async () => {
// We don't attempt to create the index if it is an Alias/DS
if (this.indexIsAlias) {
this.logger.debug(`No need to create index [${this.index}] as it is an Alias or DS.`);
return;
}
try {
if (await this.esClient.indices.exists({ index: this.index })) {
return;
}
await this.esClient.indices.create({
index: this.index,
mappings: {
dynamic: false,
properties: {
file: fileMappings,
await this.esClient.indices
.create({
index: this.index,
mappings: {
dynamic: false,
properties: {
file: fileMappings,
},
},
},
});
})
.catch(
wrapErrorAndReThrow.withMessagePrefix('EsIndexFilesMetadataClient.createIfNotExists(): ')
);
this.logger.info(`index [${this.index}] created with default mappings.`);
} catch (e) {
this.logger.error(`Failed to create index [${this.index}]: ${e.message}`);
this.logger.debug(e);
// best effort
}
});
private async getBackingIndex(id: string): Promise<string> {
if (!this.indexIsAlias) {
return this.index;
}
const doc = await this.esClient
.search({
index: this.index,
body: {
size: 1,
query: {
term: {
_id: id,
},
},
_source: false, // suppress the document content
},
})
.catch(
wrapErrorAndReThrow.withMessagePrefix('EsIndexFilesMetadataClient.getBackingIndex(): ')
);
const docIndex = doc.hits.hits?.[0]?._index;
if (!docIndex) {
const err = new Error(
`Unable to determine backing index for file id [${id}] in index (alias) [${this.index}]`
);
this.logger.error(err);
throw err;
}
return docIndex;
}
async create({ id, metadata }: FileDescriptor<M>): Promise<FileDescriptor<M>> {
await this.createIfNotExists();
const result = await this.esClient.index<FileDocument>({
index: this.index,
id,
document: { file: metadata },
refresh: 'wait_for',
});
const result = await this.esClient
.index<FileDocument>({
index: this.index,
id,
document: {
file: metadata,
// Add `@timestamp` if index is an Alias/DS
...(this.indexIsAlias ? { '@timestamp': new Date().toISOString() } : {}),
},
op_type: 'create',
refresh: 'wait_for',
})
.catch(wrapErrorAndReThrow.withMessagePrefix('EsIndexFilesMetadataClient.create(): '));
return {
id: result._id,
metadata,
@ -90,24 +152,28 @@ export class EsIndexFilesMetadataClient<M = unknown> implements FileMetadataClie
if (indexIsAlias) {
doc = (
await esClient.search<FileDocument<M>>({
index,
body: {
size: 1,
query: {
term: {
_id: id,
await esClient
.search<FileDocument<M>>({
index,
body: {
size: 1,
query: {
term: {
_id: id,
},
},
},
},
})
})
.catch(wrapErrorAndReThrow.withMessagePrefix('EsIndexFilesMetadataClient.get(): '))
).hits.hits?.[0]?._source;
} else {
doc = (
await esClient.get<FileDocument<M>>({
index,
id,
})
await esClient
.get<FileDocument<M>>({
index,
id,
})
.catch(wrapErrorAndReThrow.withMessagePrefix('EsIndexFilesMetadataClient.get(): '))
)._source;
}
@ -141,16 +207,23 @@ export class EsIndexFilesMetadataClient<M = unknown> implements FileMetadataClie
}
async delete({ id }: DeleteArg): Promise<void> {
await this.esClient.delete({ index: this.index, id });
await this.esClient
.delete({ index: this.index, id })
.catch(wrapErrorAndReThrow.withMessagePrefix('EsIndexFilesMetadataClient.delete(): '));
}
async update({ id, metadata }: UpdateArgs<M>): Promise<FileDescriptor<M>> {
await this.esClient.update({
index: this.index,
id,
doc: { file: metadata },
refresh: 'wait_for',
});
const index = await this.getBackingIndex(id);
await this.esClient
.update({
index,
id,
doc: { file: metadata },
refresh: 'wait_for',
})
.catch(wrapErrorAndReThrow.withMessagePrefix('EsIndexFilesMetadataClient.update(): '));
return this.get({ id });
}
@ -167,14 +240,16 @@ export class EsIndexFilesMetadataClient<M = unknown> implements FileMetadataClie
total: number;
files: Array<FileDescriptor<unknown>>;
}> {
const result = await this.esClient.search<FileDocument<M>>({
track_total_hits: true,
index: this.index,
expand_wildcards: 'hidden',
query: filterArgsToESQuery({ ...filterArgs, attrPrefix: this.attrPrefix }),
...this.paginationToES({ page, perPage }),
sort: 'file.created',
});
const result = await this.esClient
.search<FileDocument<M>>({
track_total_hits: true,
index: this.index,
expand_wildcards: 'hidden',
query: filterArgsToESQuery({ ...filterArgs, attrPrefix: this.attrPrefix }),
...this.paginationToES({ page, perPage }),
sort: 'file.created',
})
.catch(wrapErrorAndReThrow.withMessagePrefix('EsIndexFilesMetadataClient.find(): '));
return {
total: (result.hits.total as SearchTotalHits).value,

View file

@ -6,6 +6,7 @@
* Side Public License, v 1.
*/
import { errors } from '@elastic/elasticsearch';
import type { FileMetadata } from '../../common';
export function createDefaultFileAttributes(): Pick<
@ -19,3 +20,46 @@ export function createDefaultFileAttributes(): Pick<
Updated: dateString,
};
}
export class FilesPluginError extends Error {
constructor(message: string, public readonly meta?: any) {
super(message);
// For debugging - capture name of subclasses
this.name = this.constructor.name;
}
}
interface WrapErrorAndReThrowInterface {
(e: Error, messagePrefix?: string): never;
withMessagePrefix: (messagePrefix: string) => (e: Error) => never;
}
/**
* A helper method that can be used with Promises to wrap errors encountered with more details
* info. Mainly useful with calls to SO/ES as those errors normally don't include a good stack
* trace that points to where the error occurred.
* @param e
* @param messagePrefix
*/
export const wrapErrorAndReThrow: WrapErrorAndReThrowInterface = (
e: Error,
messagePrefix: string = ''
): never => {
if (e instanceof FilesPluginError) {
throw e;
}
let details: string = '';
// Create additional details based on known errors
if (e instanceof errors.ResponseError) {
details = `\nRequest: ${e.meta.meta.request.params.method} ${e.meta.meta.request.params.path}`;
}
throw new FilesPluginError(messagePrefix + e.message + details, e);
};
wrapErrorAndReThrow.withMessagePrefix = (messagePrefix: string): ((e: Error) => never) => {
return (e: Error) => {
return wrapErrorAndReThrow(e, messagePrefix);
};
};

View file

@ -31,6 +31,7 @@
"@kbn/logging-mocks",
"@kbn/core-elasticsearch-server-mocks",
"@kbn/core-saved-objects-server-mocks",
"@kbn/logging",
],
"exclude": [
"target/**/*",