mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
[kbn-pm] Max concurrency per batch (#16920)
This commit is contained in:
parent
79efbb61f7
commit
34ec484658
3 changed files with 116 additions and 8 deletions
47
packages/kbn-pm/dist/index.js
vendored
47
packages/kbn-pm/dist/index.js
vendored
|
@ -11130,12 +11130,9 @@ Object.defineProperty(exports, "__esModule", {
|
|||
let parallelizeBatches = exports.parallelizeBatches = (() => {
|
||||
var _ref = _asyncToGenerator(function* (batches, fn) {
|
||||
for (const batch of batches) {
|
||||
const running = batch.map(function (obj) {
|
||||
return fn(obj);
|
||||
});
|
||||
// We need to make sure the entire batch has completed before we can move on
|
||||
// to the next batch
|
||||
yield Promise.all(running);
|
||||
yield parallelize(batch, fn);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -11144,6 +11141,48 @@ let parallelizeBatches = exports.parallelizeBatches = (() => {
|
|||
};
|
||||
})();
|
||||
|
||||
let parallelize = exports.parallelize = (() => {
|
||||
var _ref2 = _asyncToGenerator(function* (items, fn, concurrency = 4) {
|
||||
if (items.length === 0) {
|
||||
return;
|
||||
}
|
||||
return new Promise(function (resolve, reject) {
|
||||
let scheduleItem = (() => {
|
||||
var _ref3 = _asyncToGenerator(function* (item) {
|
||||
activePromises++;
|
||||
try {
|
||||
yield fn(item);
|
||||
activePromises--;
|
||||
if (values.length > 0) {
|
||||
// We have more work to do, so we schedule the next promise
|
||||
scheduleItem(values.shift());
|
||||
} else if (activePromises === 0) {
|
||||
// We have no more values left, and all items have completed, so we've
|
||||
// completed all the work.
|
||||
resolve();
|
||||
}
|
||||
} catch (error) {
|
||||
reject(error);
|
||||
}
|
||||
});
|
||||
|
||||
return function scheduleItem(_x5) {
|
||||
return _ref3.apply(this, arguments);
|
||||
};
|
||||
})();
|
||||
|
||||
let activePromises = 0;
|
||||
const values = items.slice(0);
|
||||
|
||||
values.splice(0, concurrency).map(scheduleItem);
|
||||
});
|
||||
});
|
||||
|
||||
return function parallelize(_x3, _x4) {
|
||||
return _ref2.apply(this, arguments);
|
||||
};
|
||||
})();
|
||||
|
||||
function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, arguments); return new Promise(function (resolve, reject) { function step(key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { return Promise.resolve(value).then(function (value) { step("next", value); }, function (err) { step("throw", err); }); } } return step("next"); }); }; }
|
||||
|
||||
/***/ }),
|
||||
|
|
|
@ -55,6 +55,39 @@ test('parallelizes batches', async () => {
|
|||
expect(completed).toEqual(['bar', 'foo', 'baz', 'parallelizeBatches']);
|
||||
});
|
||||
|
||||
test('schedules at most 4 calls at the same time (concurrency)', async () => {
|
||||
const foo = createPromiseWithResolve();
|
||||
const bar = createPromiseWithResolve();
|
||||
const baz = createPromiseWithResolve();
|
||||
const quux = createPromiseWithResolve();
|
||||
const foobar = createPromiseWithResolve();
|
||||
|
||||
const batches = [[foo, bar, baz, quux, foobar]];
|
||||
const parallelize = parallelizeBatches(batches, async obj => {
|
||||
obj.called = true;
|
||||
await obj.promise;
|
||||
});
|
||||
|
||||
expect(foo.called).toBe(true);
|
||||
expect(bar.called).toBe(true);
|
||||
expect(baz.called).toBe(true);
|
||||
expect(quux.called).toBe(true);
|
||||
expect(foobar.called).toBe(false);
|
||||
|
||||
foo.resolve();
|
||||
await tick();
|
||||
|
||||
expect(foobar.called).toBe(true);
|
||||
|
||||
bar.resolve();
|
||||
baz.resolve();
|
||||
quux.resolve();
|
||||
foobar.resolve();
|
||||
await tick();
|
||||
|
||||
await expect(parallelize).resolves.toBe(undefined);
|
||||
});
|
||||
|
||||
test('rejects if any promise rejects', async () => {
|
||||
const foo = createPromiseWithResolve();
|
||||
const bar = createPromiseWithResolve();
|
||||
|
|
|
@ -1,12 +1,48 @@
|
|||
export async function parallelizeBatches<T>(
|
||||
batches: Array<T[]>,
|
||||
batches: T[][],
|
||||
fn: (item: T) => Promise<void>
|
||||
) {
|
||||
for (const batch of batches) {
|
||||
const running = batch.map(obj => fn(obj));
|
||||
|
||||
// We need to make sure the entire batch has completed before we can move on
|
||||
// to the next batch
|
||||
await Promise.all(running);
|
||||
await parallelize(batch, fn);
|
||||
}
|
||||
}
|
||||
|
||||
export async function parallelize<T>(
|
||||
items: T[],
|
||||
fn: (item: T) => Promise<void>,
|
||||
concurrency = 4
|
||||
) {
|
||||
if (items.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
let activePromises = 0;
|
||||
const values = items.slice(0);
|
||||
|
||||
async function scheduleItem(item: T) {
|
||||
activePromises++;
|
||||
|
||||
try {
|
||||
await fn(item);
|
||||
|
||||
activePromises--;
|
||||
|
||||
if (values.length > 0) {
|
||||
// We have more work to do, so we schedule the next promise
|
||||
scheduleItem(values.shift()!);
|
||||
} else if (activePromises === 0) {
|
||||
// We have no more values left, and all items have completed, so we've
|
||||
// completed all the work.
|
||||
resolve();
|
||||
}
|
||||
} catch (error) {
|
||||
reject(error);
|
||||
}
|
||||
}
|
||||
|
||||
values.splice(0, concurrency).map(scheduleItem);
|
||||
});
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue