mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
[Files] Tests for createEsFileClient()
usage when indexAsAlias
option and minor refactor (#153815)
## Summary This PR builds on top of https://github.com/elastic/kibana/pull/153342 and: - adds test for the `indexAsAlias` option that was added to `createEsFileClient()` - removes `fetchDoc()` utility (not needed) --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
2cb9628329
commit
8c6e58c7ad
6 changed files with 316 additions and 142 deletions
|
@ -13,6 +13,8 @@ import { encode } from 'cbor-x';
|
|||
import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
|
||||
import { ContentStream, ContentStreamEncoding, ContentStreamParameters } from './content_stream';
|
||||
import type { GetResponse } from '@elastic/elasticsearch/lib/api/types';
|
||||
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
||||
import { FileDocument } from '../../../../file_client/file_metadata_client/adapters/es_index';
|
||||
|
||||
describe('ContentStream', () => {
|
||||
let client: ReturnType<typeof elasticsearchServiceMock.createElasticsearchClient>;
|
||||
|
@ -30,8 +32,9 @@ describe('ContentStream', () => {
|
|||
encoding: 'base64' as ContentStreamEncoding,
|
||||
size: 1,
|
||||
} as ContentStreamParameters,
|
||||
indexIsAlias = false,
|
||||
} = {}) => {
|
||||
return new ContentStream(client, id, index, logger, params);
|
||||
return new ContentStream(client, id, index, logger, params, indexIsAlias);
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
|
@ -43,124 +46,193 @@ describe('ContentStream', () => {
|
|||
});
|
||||
|
||||
describe('read', () => {
|
||||
beforeEach(() => {
|
||||
stream = getContentStream({ params: { size: 1 } });
|
||||
});
|
||||
describe('with `indexIsAlias` set to `true`', () => {
|
||||
let searchResponse: estypes.SearchResponse<FileDocument<{}>>;
|
||||
|
||||
it('should perform a search using index and the document id', async () => {
|
||||
await new Promise((resolve) => stream.once('data', resolve));
|
||||
beforeEach(() => {
|
||||
searchResponse = {
|
||||
took: 3,
|
||||
timed_out: false,
|
||||
_shards: {
|
||||
total: 2,
|
||||
successful: 2,
|
||||
skipped: 0,
|
||||
failed: 0,
|
||||
},
|
||||
hits: {
|
||||
total: {
|
||||
value: 1,
|
||||
relation: 'eq',
|
||||
},
|
||||
max_score: 0,
|
||||
hits: [
|
||||
{
|
||||
_index: 'foo',
|
||||
_id: '123',
|
||||
_score: 1.0,
|
||||
},
|
||||
],
|
||||
},
|
||||
};
|
||||
|
||||
expect(client.get).toHaveBeenCalledTimes(1);
|
||||
|
||||
const [[request]] = client.get.mock.calls;
|
||||
expect(request).toHaveProperty('index', 'somewhere');
|
||||
expect(request).toHaveProperty('id', 'something.0');
|
||||
});
|
||||
|
||||
it('should read the document contents', async () => {
|
||||
const data = await new Promise((resolve) => stream.once('data', resolve));
|
||||
expect(data).toEqual(Buffer.from('some content'));
|
||||
});
|
||||
|
||||
it('should be an empty stream on empty response', async () => {
|
||||
client.get.mockResponseOnce(toReadable());
|
||||
const onData = jest.fn();
|
||||
|
||||
stream.on('data', onData);
|
||||
await new Promise((resolve) => stream.once('end', resolve));
|
||||
|
||||
expect(onData).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should emit an error event', async () => {
|
||||
client.get.mockRejectedValueOnce('some error');
|
||||
|
||||
stream.read();
|
||||
const error = await new Promise((resolve) => stream.once('error', resolve));
|
||||
|
||||
expect(error).toBe('some error');
|
||||
});
|
||||
|
||||
it('should decode base64 encoded content', async () => {
|
||||
client.get.mockResponseOnce(
|
||||
toReadable(set({ found: true }, '_source.data', Buffer.from('encoded content')))
|
||||
);
|
||||
const data = await new Promise((resolve) => stream.once('data', resolve));
|
||||
|
||||
expect(data).toEqual(Buffer.from('encoded content'));
|
||||
});
|
||||
|
||||
it('should compound content from multiple chunks', async () => {
|
||||
const [one, two, three] = ['12', '34', '56'].map(Buffer.from);
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));
|
||||
|
||||
stream = getContentStream({
|
||||
params: { size: 6 },
|
||||
client.search.mockResolvedValue(searchResponse);
|
||||
});
|
||||
|
||||
let data = '';
|
||||
for await (const chunk of stream) {
|
||||
data += chunk;
|
||||
}
|
||||
it('should use es.search() to find chunk index', async () => {
|
||||
stream = getContentStream({ params: { size: 1 }, indexIsAlias: true });
|
||||
const data = await new Promise((resolve) => stream.once('data', resolve));
|
||||
|
||||
expect(data).toEqual('123456');
|
||||
expect(client.get).toHaveBeenCalledTimes(3);
|
||||
expect(client.search).toHaveBeenCalledWith({
|
||||
body: {
|
||||
_source: false,
|
||||
query: {
|
||||
term: {
|
||||
_id: 'something.0',
|
||||
},
|
||||
},
|
||||
size: 1,
|
||||
},
|
||||
index: 'somewhere',
|
||||
});
|
||||
expect(data).toEqual(Buffer.from('some content'));
|
||||
});
|
||||
|
||||
const [[request1], [request2], [request3]] = client.get.mock.calls;
|
||||
it('should throw if chunk is not found', async () => {
|
||||
searchResponse.hits.hits = [];
|
||||
stream = getContentStream({ params: { size: 1 }, indexIsAlias: true });
|
||||
|
||||
expect(request1).toHaveProperty('index', 'somewhere');
|
||||
expect(request1).toHaveProperty('id', 'something.0');
|
||||
expect(request2).toHaveProperty('index', 'somewhere');
|
||||
expect(request2).toHaveProperty('id', 'something.1');
|
||||
expect(request3).toHaveProperty('index', 'somewhere');
|
||||
expect(request3).toHaveProperty('id', 'something.2');
|
||||
const readPromise = new Promise((resolve, reject) => {
|
||||
stream.once('data', resolve);
|
||||
stream.once('error', reject);
|
||||
});
|
||||
|
||||
await expect(readPromise).rejects.toHaveProperty(
|
||||
'message',
|
||||
'Unable to determine index for file chunk id [something.0] in index (alias) [somewhere]'
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
it('should stop reading on empty chunk', async () => {
|
||||
const [one, two, three] = ['12', '34', ''].map(Buffer.from);
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));
|
||||
stream = getContentStream({ params: { size: 12 } });
|
||||
let data = '';
|
||||
for await (const chunk of stream) {
|
||||
data += chunk;
|
||||
}
|
||||
describe('with `indexIsAlias` set to `false`', () => {
|
||||
beforeEach(() => {
|
||||
stream = getContentStream({ params: { size: 1 } });
|
||||
});
|
||||
|
||||
expect(data).toEqual('1234');
|
||||
expect(client.get).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
it('should perform a search using index and the document id', async () => {
|
||||
await new Promise((resolve) => stream.once('data', resolve));
|
||||
|
||||
it('should read while chunks are present when there is no size', async () => {
|
||||
const [one, two] = ['12', '34'].map(Buffer.from);
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
|
||||
client.get.mockResponseOnce(toReadable({ found: true }));
|
||||
stream = getContentStream({ params: { size: undefined } });
|
||||
let data = '';
|
||||
for await (const chunk of stream) {
|
||||
data += chunk;
|
||||
}
|
||||
expect(client.get).toHaveBeenCalledTimes(1);
|
||||
|
||||
expect(data).toEqual('1234');
|
||||
expect(client.get).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
const [[request]] = client.get.mock.calls;
|
||||
expect(request).toHaveProperty('index', 'somewhere');
|
||||
expect(request).toHaveProperty('id', 'something.0');
|
||||
});
|
||||
|
||||
it('should decode every chunk separately', async () => {
|
||||
const [one, two, three, four] = ['12', '34', '56', ''].map(Buffer.from);
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', four)));
|
||||
stream = getContentStream({ params: { size: 12 } });
|
||||
let data = '';
|
||||
for await (const chunk of stream) {
|
||||
data += chunk;
|
||||
}
|
||||
it('should read the document contents', async () => {
|
||||
const data = await new Promise((resolve) => stream.once('data', resolve));
|
||||
expect(data).toEqual(Buffer.from('some content'));
|
||||
});
|
||||
|
||||
expect(data).toEqual('123456');
|
||||
it('should be an empty stream on empty response', async () => {
|
||||
client.get.mockResponseOnce(toReadable());
|
||||
const onData = jest.fn();
|
||||
|
||||
stream.on('data', onData);
|
||||
await new Promise((resolve) => stream.once('end', resolve));
|
||||
|
||||
expect(onData).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should emit an error event', async () => {
|
||||
client.get.mockRejectedValueOnce('some error');
|
||||
|
||||
stream.read();
|
||||
const error = await new Promise((resolve) => stream.once('error', resolve));
|
||||
|
||||
expect(error).toBe('some error');
|
||||
});
|
||||
|
||||
it('should decode base64 encoded content', async () => {
|
||||
client.get.mockResponseOnce(
|
||||
toReadable(set({ found: true }, '_source.data', Buffer.from('encoded content')))
|
||||
);
|
||||
const data = await new Promise((resolve) => stream.once('data', resolve));
|
||||
|
||||
expect(data).toEqual(Buffer.from('encoded content'));
|
||||
});
|
||||
|
||||
it('should compound content from multiple chunks', async () => {
|
||||
const [one, two, three] = ['12', '34', '56'].map(Buffer.from);
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));
|
||||
|
||||
stream = getContentStream({
|
||||
params: { size: 6 },
|
||||
});
|
||||
|
||||
let data = '';
|
||||
for await (const chunk of stream) {
|
||||
data += chunk;
|
||||
}
|
||||
|
||||
expect(data).toEqual('123456');
|
||||
expect(client.get).toHaveBeenCalledTimes(3);
|
||||
|
||||
const [[request1], [request2], [request3]] = client.get.mock.calls;
|
||||
|
||||
expect(request1).toHaveProperty('index', 'somewhere');
|
||||
expect(request1).toHaveProperty('id', 'something.0');
|
||||
expect(request2).toHaveProperty('index', 'somewhere');
|
||||
expect(request2).toHaveProperty('id', 'something.1');
|
||||
expect(request3).toHaveProperty('index', 'somewhere');
|
||||
expect(request3).toHaveProperty('id', 'something.2');
|
||||
});
|
||||
|
||||
it('should stop reading on empty chunk', async () => {
|
||||
const [one, two, three] = ['12', '34', ''].map(Buffer.from);
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));
|
||||
stream = getContentStream({ params: { size: 12 } });
|
||||
let data = '';
|
||||
for await (const chunk of stream) {
|
||||
data += chunk;
|
||||
}
|
||||
|
||||
expect(data).toEqual('1234');
|
||||
expect(client.get).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
|
||||
it('should read while chunks are present when there is no size', async () => {
|
||||
const [one, two] = ['12', '34'].map(Buffer.from);
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
|
||||
client.get.mockResponseOnce(toReadable({ found: true }));
|
||||
stream = getContentStream({ params: { size: undefined } });
|
||||
let data = '';
|
||||
for await (const chunk of stream) {
|
||||
data += chunk;
|
||||
}
|
||||
|
||||
expect(data).toEqual('1234');
|
||||
expect(client.get).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
|
||||
it('should decode every chunk separately', async () => {
|
||||
const [one, two, three, four] = ['12', '34', '56', ''].map(Buffer.from);
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));
|
||||
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', four)));
|
||||
stream = getContentStream({ params: { size: 12 } });
|
||||
let data = '';
|
||||
for await (const chunk of stream) {
|
||||
data += chunk;
|
||||
}
|
||||
|
||||
expect(data).toEqual('123456');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -101,12 +101,15 @@ export class ContentStream extends Duplex {
|
|||
},
|
||||
});
|
||||
|
||||
const docIndex = chunkDocMeta.hits.hits[0]._index;
|
||||
const docIndex = chunkDocMeta.hits.hits?.[0]?._index;
|
||||
|
||||
if (!docIndex) {
|
||||
throw new Error(
|
||||
const err = new Error(
|
||||
`Unable to determine index for file chunk id [${id}] in index (alias) [${this.index}]`
|
||||
);
|
||||
|
||||
this.logger.error(err);
|
||||
throw err;
|
||||
}
|
||||
|
||||
return docIndex;
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import {
|
||||
type ElasticsearchClientMock,
|
||||
elasticsearchServiceMock,
|
||||
loggingSystemMock,
|
||||
} from '@kbn/core/server/mocks';
|
||||
import { MockedLogger } from '@kbn/logging-mocks';
|
||||
import { createEsFileClient } from './create_es_file_client';
|
||||
import { FileClient } from './types';
|
||||
import { ElasticsearchBlobStorageClient } from '../blob_storage_service';
|
||||
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
||||
import { FileDocument } from './file_metadata_client/adapters/es_index';
|
||||
|
||||
describe('When initializing file client via createESFileClient()', () => {
|
||||
let esClient: ElasticsearchClientMock;
|
||||
let logger: MockedLogger;
|
||||
|
||||
beforeEach(() => {
|
||||
ElasticsearchBlobStorageClient.configureConcurrentUpload(Infinity);
|
||||
esClient = elasticsearchServiceMock.createElasticsearchClient();
|
||||
logger = loggingSystemMock.createLogger();
|
||||
});
|
||||
|
||||
describe('and `indexIsAlias` argument is used', () => {
|
||||
let fileClient: FileClient;
|
||||
let searchResponse: estypes.SearchResponse<FileDocument<{}>>;
|
||||
|
||||
beforeEach(() => {
|
||||
searchResponse = {
|
||||
took: 3,
|
||||
timed_out: false,
|
||||
_shards: {
|
||||
total: 2,
|
||||
successful: 2,
|
||||
skipped: 0,
|
||||
failed: 0,
|
||||
},
|
||||
hits: {
|
||||
total: {
|
||||
value: 1,
|
||||
relation: 'eq',
|
||||
},
|
||||
max_score: 0,
|
||||
hits: [
|
||||
{
|
||||
_index: 'foo',
|
||||
_id: '123',
|
||||
_score: 1.0,
|
||||
_source: {
|
||||
file: {
|
||||
name: 'foo.txt',
|
||||
Status: 'READY',
|
||||
created: '2023-03-27T20:45:31.490Z',
|
||||
Updated: '2023-03-27T20:45:31.490Z',
|
||||
FileKind: '',
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
};
|
||||
|
||||
esClient.search.mockResolvedValue(searchResponse);
|
||||
fileClient = createEsFileClient({
|
||||
logger,
|
||||
metadataIndex: 'file-meta',
|
||||
blobStorageIndex: 'file-data',
|
||||
elasticsearchClient: esClient,
|
||||
indexIsAlias: true,
|
||||
});
|
||||
});
|
||||
|
||||
it('should use es.search() to retrieve file metadata', async () => {
|
||||
await fileClient.get({ id: '123' });
|
||||
expect(esClient.search).toHaveBeenCalledWith({
|
||||
body: {
|
||||
query: {
|
||||
term: {
|
||||
_id: '123',
|
||||
},
|
||||
},
|
||||
size: 1,
|
||||
},
|
||||
index: 'file-meta',
|
||||
});
|
||||
});
|
||||
|
||||
it('should throw an error if file is not found', async () => {
|
||||
(searchResponse.hits.total as estypes.SearchTotalHits).value = 0;
|
||||
searchResponse.hits.hits = [];
|
||||
await expect(fileClient.get({ id: '123 ' })).rejects.toHaveProperty(
|
||||
'message',
|
||||
'File not found'
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -12,7 +12,6 @@ import { Logger } from '@kbn/core/server';
|
|||
import { toElasticsearchQuery } from '@kbn/es-query';
|
||||
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
||||
import { MappingProperty, SearchTotalHits } from '@elastic/elasticsearch/lib/api/types';
|
||||
import { fetchDoc } from '../../utils';
|
||||
import type { FilesMetrics, FileMetadata, Pagination } from '../../../../common';
|
||||
import type { FindFileArgs } from '../../../file_service';
|
||||
import type {
|
||||
|
@ -36,7 +35,7 @@ const fileMappings: MappingProperty = {
|
|||
},
|
||||
};
|
||||
|
||||
interface FileDocument<M = unknown> {
|
||||
export interface FileDocument<M = unknown> {
|
||||
file: FileMetadata<M>;
|
||||
}
|
||||
|
||||
|
@ -82,11 +81,36 @@ export class EsIndexFilesMetadataClient<M = unknown> implements FileMetadataClie
|
|||
}
|
||||
|
||||
async get({ id }: GetArg): Promise<FileDescriptor<M>> {
|
||||
const { _source: doc } =
|
||||
(await fetchDoc<FileDocument<M>>(this.esClient, this.index, id, this.indexIsAlias)) ?? {};
|
||||
const { esClient, index, indexIsAlias } = this;
|
||||
let doc: FileDocument<M> | undefined;
|
||||
|
||||
if (indexIsAlias) {
|
||||
doc = (
|
||||
await esClient.search<FileDocument<M>>({
|
||||
index,
|
||||
body: {
|
||||
size: 1,
|
||||
query: {
|
||||
term: {
|
||||
_id: id,
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
).hits.hits?.[0]?._source;
|
||||
} else {
|
||||
doc = (
|
||||
await esClient.get<FileDocument<M>>({
|
||||
index,
|
||||
id,
|
||||
})
|
||||
)._source;
|
||||
}
|
||||
|
||||
if (!doc) {
|
||||
this.logger.error(`File with id "${id}" not found`);
|
||||
this.logger.error(
|
||||
`File with id "${id}" not found in index ${indexIsAlias ? 'alias ' : ''}"${index}"`
|
||||
);
|
||||
throw new Error('File not found');
|
||||
}
|
||||
|
||||
|
|
|
@ -6,8 +6,6 @@
|
|||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { GetResponse } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
||||
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
||||
import type { FileMetadata } from '../../common';
|
||||
|
||||
export function createDefaultFileAttributes(): Pick<
|
||||
|
@ -21,31 +19,3 @@ export function createDefaultFileAttributes(): Pick<
|
|||
Updated: dateString,
|
||||
};
|
||||
}
|
||||
|
||||
export const fetchDoc = async <TDocument = unknown>(
|
||||
esClient: ElasticsearchClient,
|
||||
index: string,
|
||||
docId: string,
|
||||
indexIsAlias: boolean = false
|
||||
): Promise<GetResponse<TDocument> | undefined> => {
|
||||
if (indexIsAlias) {
|
||||
const fileDocSearchResult = await esClient.search<TDocument>({
|
||||
index,
|
||||
body: {
|
||||
size: 1,
|
||||
query: {
|
||||
term: {
|
||||
_id: docId,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
return fileDocSearchResult.hits.hits[0] as GetResponse<TDocument>;
|
||||
}
|
||||
|
||||
return esClient.get<TDocument>({
|
||||
index,
|
||||
id: docId,
|
||||
});
|
||||
};
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
"@kbn/core-logging-server-mocks",
|
||||
"@kbn/ecs",
|
||||
"@kbn/safer-lodash-set",
|
||||
"@kbn/logging-mocks",
|
||||
],
|
||||
"exclude": [
|
||||
"target/**/*",
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue