mirror of
https://github.com/elastic/kibana.git
synced 2025-04-25 02:09:32 -04:00
improve client-side SO client get pooling (#82603)
This commit is contained in:
parent
706be6b10a
commit
ed47da8e87
2 changed files with 102 additions and 28 deletions
|
@ -97,6 +97,57 @@ describe('SavedObjectsClient', () => {
|
||||||
`);
|
`);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test('removes duplicates when calling `_bulk_get`', async () => {
|
||||||
|
// Await #get call to ensure batchQueue is empty and throttle has reset
|
||||||
|
await savedObjectsClient.get('type2', doc.id);
|
||||||
|
http.fetch.mockClear();
|
||||||
|
|
||||||
|
savedObjectsClient.get(doc.type, doc.id);
|
||||||
|
savedObjectsClient.get('some-type', 'some-id');
|
||||||
|
await savedObjectsClient.get(doc.type, doc.id);
|
||||||
|
|
||||||
|
expect(http.fetch).toHaveBeenCalledTimes(1);
|
||||||
|
expect(http.fetch.mock.calls[0]).toMatchInlineSnapshot(`
|
||||||
|
Array [
|
||||||
|
"/api/saved_objects/_bulk_get",
|
||||||
|
Object {
|
||||||
|
"body": "[{\\"id\\":\\"AVwSwFxtcMV38qjDZoQg\\",\\"type\\":\\"config\\"},{\\"id\\":\\"some-id\\",\\"type\\":\\"some-type\\"}]",
|
||||||
|
"method": "POST",
|
||||||
|
"query": undefined,
|
||||||
|
},
|
||||||
|
]
|
||||||
|
`);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('resolves with correct object when there are duplicates present', async () => {
|
||||||
|
// Await #get call to ensure batchQueue is empty and throttle has reset
|
||||||
|
await savedObjectsClient.get('type2', doc.id);
|
||||||
|
http.fetch.mockClear();
|
||||||
|
|
||||||
|
const call1 = savedObjectsClient.get(doc.type, doc.id);
|
||||||
|
const objFromCall2 = await savedObjectsClient.get(doc.type, doc.id);
|
||||||
|
const objFromCall1 = await call1;
|
||||||
|
|
||||||
|
expect(objFromCall1.type).toBe(doc.type);
|
||||||
|
expect(objFromCall1.id).toBe(doc.id);
|
||||||
|
|
||||||
|
expect(objFromCall2.type).toBe(doc.type);
|
||||||
|
expect(objFromCall2.id).toBe(doc.id);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('do not share instances or references between duplicate callers', async () => {
|
||||||
|
// Await #get call to ensure batchQueue is empty and throttle has reset
|
||||||
|
await savedObjectsClient.get('type2', doc.id);
|
||||||
|
http.fetch.mockClear();
|
||||||
|
|
||||||
|
const call1 = savedObjectsClient.get(doc.type, doc.id);
|
||||||
|
const objFromCall2 = await savedObjectsClient.get(doc.type, doc.id);
|
||||||
|
const objFromCall1 = await call1;
|
||||||
|
|
||||||
|
objFromCall1.set('title', 'new title');
|
||||||
|
expect(objFromCall2.get('title')).toEqual('Example title');
|
||||||
|
});
|
||||||
|
|
||||||
test('resolves with SimpleSavedObject instance', async () => {
|
test('resolves with SimpleSavedObject instance', async () => {
|
||||||
const response = savedObjectsClient.get(doc.type, doc.id);
|
const response = savedObjectsClient.get(doc.type, doc.id);
|
||||||
await expect(response).resolves.toBeInstanceOf(SimpleSavedObject);
|
await expect(response).resolves.toBeInstanceOf(SimpleSavedObject);
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { cloneDeep, pick, throttle } from 'lodash';
|
import { pick, throttle, cloneDeep } from 'lodash';
|
||||||
import { resolve as resolveUrl } from 'url';
|
import { resolve as resolveUrl } from 'url';
|
||||||
import type { PublicMethodsOf } from '@kbn/utility-types';
|
import type { PublicMethodsOf } from '@kbn/utility-types';
|
||||||
|
|
||||||
|
@ -144,6 +144,23 @@ const API_BASE_URL = '/api/saved_objects/';
|
||||||
*/
|
*/
|
||||||
export type SavedObjectsClientContract = PublicMethodsOf<SavedObjectsClient>;
|
export type SavedObjectsClientContract = PublicMethodsOf<SavedObjectsClient>;
|
||||||
|
|
||||||
|
interface ObjectTypeAndId {
|
||||||
|
id: string;
|
||||||
|
type: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
const getObjectsToFetch = (queue: BatchQueueEntry[]): ObjectTypeAndId[] => {
|
||||||
|
const objects: ObjectTypeAndId[] = [];
|
||||||
|
const inserted = new Set<string>();
|
||||||
|
queue.forEach(({ id, type }) => {
|
||||||
|
if (!inserted.has(`${type}|${id}`)) {
|
||||||
|
objects.push({ id, type });
|
||||||
|
inserted.add(`${type}|${id}`);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return objects;
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Saved Objects is Kibana's data persisentence mechanism allowing plugins to
|
* Saved Objects is Kibana's data persisentence mechanism allowing plugins to
|
||||||
* use Elasticsearch for storing plugin state. The client-side
|
* use Elasticsearch for storing plugin state. The client-side
|
||||||
|
@ -160,31 +177,34 @@ export class SavedObjectsClient {
|
||||||
* Throttled processing of get requests into bulk requests at 100ms interval
|
* Throttled processing of get requests into bulk requests at 100ms interval
|
||||||
*/
|
*/
|
||||||
private processBatchQueue = throttle(
|
private processBatchQueue = throttle(
|
||||||
() => {
|
async () => {
|
||||||
const queue = cloneDeep(this.batchQueue);
|
const queue = [...this.batchQueue];
|
||||||
this.batchQueue = [];
|
this.batchQueue = [];
|
||||||
|
|
||||||
this.bulkGet(queue)
|
try {
|
||||||
.then(({ savedObjects }) => {
|
const objectsToFetch = getObjectsToFetch(queue);
|
||||||
queue.forEach((queueItem) => {
|
const { saved_objects: savedObjects } = await this.performBulkGet(objectsToFetch);
|
||||||
const foundObject = savedObjects.find((savedObject) => {
|
|
||||||
return savedObject.id === queueItem.id && savedObject.type === queueItem.type;
|
|
||||||
});
|
|
||||||
|
|
||||||
if (!foundObject) {
|
queue.forEach((queueItem) => {
|
||||||
return queueItem.resolve(
|
const foundObject = savedObjects.find((savedObject) => {
|
||||||
this.createSavedObject(pick(queueItem, ['id', 'type']) as SavedObject)
|
return savedObject.id === queueItem.id && savedObject.type === queueItem.type;
|
||||||
);
|
});
|
||||||
}
|
|
||||||
|
|
||||||
queueItem.resolve(foundObject);
|
if (foundObject) {
|
||||||
});
|
// multiple calls may have been requested the same object.
|
||||||
})
|
// we need to clone to avoid sharing references between the instances
|
||||||
.catch((err) => {
|
queueItem.resolve(this.createSavedObject(cloneDeep(foundObject)));
|
||||||
queue.forEach((queueItem) => {
|
} else {
|
||||||
queueItem.reject(err);
|
queueItem.resolve(
|
||||||
});
|
this.createSavedObject(pick(queueItem, ['id', 'type']) as SavedObject)
|
||||||
|
);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
} catch (err) {
|
||||||
|
queue.forEach((queueItem) => {
|
||||||
|
queueItem.reject(err);
|
||||||
|
});
|
||||||
|
}
|
||||||
},
|
},
|
||||||
BATCH_INTERVAL,
|
BATCH_INTERVAL,
|
||||||
{ leading: false }
|
{ leading: false }
|
||||||
|
@ -383,14 +403,8 @@ export class SavedObjectsClient {
|
||||||
* ])
|
* ])
|
||||||
*/
|
*/
|
||||||
public bulkGet = (objects: Array<{ id: string; type: string }> = []) => {
|
public bulkGet = (objects: Array<{ id: string; type: string }> = []) => {
|
||||||
const path = this.getPath(['_bulk_get']);
|
|
||||||
const filteredObjects = objects.map((obj) => pick(obj, ['id', 'type']));
|
const filteredObjects = objects.map((obj) => pick(obj, ['id', 'type']));
|
||||||
|
return this.performBulkGet(filteredObjects).then((resp) => {
|
||||||
const request: ReturnType<SavedObjectsApi['bulkGet']> = this.savedObjectsFetch(path, {
|
|
||||||
method: 'POST',
|
|
||||||
body: JSON.stringify(filteredObjects),
|
|
||||||
});
|
|
||||||
return request.then((resp) => {
|
|
||||||
resp.saved_objects = resp.saved_objects.map((d) => this.createSavedObject(d));
|
resp.saved_objects = resp.saved_objects.map((d) => this.createSavedObject(d));
|
||||||
return renameKeys<
|
return renameKeys<
|
||||||
PromiseType<ReturnType<SavedObjectsApi['bulkGet']>>,
|
PromiseType<ReturnType<SavedObjectsApi['bulkGet']>>,
|
||||||
|
@ -399,6 +413,15 @@ export class SavedObjectsClient {
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
private async performBulkGet(objects: ObjectTypeAndId[]) {
|
||||||
|
const path = this.getPath(['_bulk_get']);
|
||||||
|
const request: ReturnType<SavedObjectsApi['bulkGet']> = this.savedObjectsFetch(path, {
|
||||||
|
method: 'POST',
|
||||||
|
body: JSON.stringify(objects),
|
||||||
|
});
|
||||||
|
return request;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Updates an object
|
* Updates an object
|
||||||
*
|
*
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue