[6.x] [kbn-pm] Max concurrency per batch (#19024)

This commit is contained in:
Aleh Zasypkin 2018-05-18 10:15:19 +02:00 committed by GitHub
parent 8455bcdbba
commit 4f75ab2673
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 116 additions and 8 deletions

View file

@ -11130,12 +11130,9 @@ Object.defineProperty(exports, "__esModule", {
let parallelizeBatches = exports.parallelizeBatches = (() => {
var _ref = _asyncToGenerator(function* (batches, fn) {
for (const batch of batches) {
const running = batch.map(function (obj) {
return fn(obj);
});
// We need to make sure the entire batch has completed before we can move on
// to the next batch
yield Promise.all(running);
yield parallelize(batch, fn);
}
});
@ -11144,6 +11141,48 @@ let parallelizeBatches = exports.parallelizeBatches = (() => {
};
})();
let parallelize = exports.parallelize = (() => {
var _ref2 = _asyncToGenerator(function* (items, fn, concurrency = 4) {
if (items.length === 0) {
return;
}
return new Promise(function (resolve, reject) {
let scheduleItem = (() => {
var _ref3 = _asyncToGenerator(function* (item) {
activePromises++;
try {
yield fn(item);
activePromises--;
if (values.length > 0) {
// We have more work to do, so we schedule the next promise
scheduleItem(values.shift());
} else if (activePromises === 0) {
// We have no more values left, and all items have completed, so we've
// completed all the work.
resolve();
}
} catch (error) {
reject(error);
}
});
return function scheduleItem(_x5) {
return _ref3.apply(this, arguments);
};
})();
let activePromises = 0;
const values = items.slice(0);
values.splice(0, concurrency).map(scheduleItem);
});
});
return function parallelize(_x3, _x4) {
return _ref2.apply(this, arguments);
};
})();
function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, arguments); return new Promise(function (resolve, reject) { function step(key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { return Promise.resolve(value).then(function (value) { step("next", value); }, function (err) { step("throw", err); }); } } return step("next"); }); }; }
/***/ }),

View file

@ -55,6 +55,39 @@ test('parallelizes batches', async () => {
expect(completed).toEqual(['bar', 'foo', 'baz', 'parallelizeBatches']);
});
test('schedules at most 4 calls at the same time (concurrency)', async () => {
const foo = createPromiseWithResolve();
const bar = createPromiseWithResolve();
const baz = createPromiseWithResolve();
const quux = createPromiseWithResolve();
const foobar = createPromiseWithResolve();
const batches = [[foo, bar, baz, quux, foobar]];
const parallelize = parallelizeBatches(batches, async obj => {
obj.called = true;
await obj.promise;
});
expect(foo.called).toBe(true);
expect(bar.called).toBe(true);
expect(baz.called).toBe(true);
expect(quux.called).toBe(true);
expect(foobar.called).toBe(false);
foo.resolve();
await tick();
expect(foobar.called).toBe(true);
bar.resolve();
baz.resolve();
quux.resolve();
foobar.resolve();
await tick();
await expect(parallelize).resolves.toBe(undefined);
});
test('rejects if any promise rejects', async () => {
const foo = createPromiseWithResolve();
const bar = createPromiseWithResolve();

View file

@ -1,12 +1,48 @@
export async function parallelizeBatches<T>(
batches: Array<T[]>,
batches: T[][],
fn: (item: T) => Promise<void>
) {
for (const batch of batches) {
const running = batch.map(obj => fn(obj));
// We need to make sure the entire batch has completed before we can move on
// to the next batch
await Promise.all(running);
await parallelize(batch, fn);
}
}
export async function parallelize<T>(
items: T[],
fn: (item: T) => Promise<void>,
concurrency = 4
) {
if (items.length === 0) {
return;
}
return new Promise<void>((resolve, reject) => {
let activePromises = 0;
const values = items.slice(0);
async function scheduleItem(item: T) {
activePromises++;
try {
await fn(item);
activePromises--;
if (values.length > 0) {
// We have more work to do, so we schedule the next promise
scheduleItem(values.shift()!);
} else if (activePromises === 0) {
// We have no more values left, and all items have completed, so we've
// completed all the work.
resolve();
}
} catch (error) {
reject(error);
}
}
values.splice(0, concurrency).map(scheduleItem);
});
}