mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
[ResponseOps][Task Manager] fix infinite recursion in taskPool recursion (#172191)
resolves https://github.com/elastic/kibana/issues/172116 Prevents `taskPool.run()` from getting into an infinite recursive loop, by limiting the number of times it can recurse. ### Checklist - [x] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios --------- Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
d31a15807b
commit
4b7dbc3a25
2 changed files with 61 additions and 4 deletions
|
@ -392,6 +392,43 @@ describe('TaskPool', () => {
|
|||
expect(shouldNotRun).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
// This test is from https://github.com/elastic/kibana/issues/172116
|
||||
// It's not clear how to reproduce the actual error, but it is easy to
|
||||
// reproduce with the wacky test below. It does log the exact error
|
||||
// from that issue, without the corresponding fix in task_pool.ts
|
||||
test('works when available workers is 0 but there are tasks to run', async () => {
|
||||
const logger = loggingSystemMock.create().get();
|
||||
const pool = new TaskPool({
|
||||
maxWorkers$: of(2),
|
||||
logger,
|
||||
});
|
||||
|
||||
const shouldRun = mockRun();
|
||||
|
||||
const taskId = uuidv4();
|
||||
const task1 = mockTask({ id: taskId, run: shouldRun });
|
||||
|
||||
// we need to alternate the values of `availableWorkers`. First it
|
||||
// should be 0, then 1, then 0, then 1, etc. This will cause task_pool.run
|
||||
// to partition tasks (0 to run, everything as leftover), then at the
|
||||
// end of run(), to check if it should recurse, it should be > 0.
|
||||
let awValue = 1;
|
||||
Object.defineProperty(pool, 'availableWorkers', {
|
||||
get() {
|
||||
return ++awValue % 2;
|
||||
},
|
||||
});
|
||||
|
||||
const result = await pool.run([task1]);
|
||||
expect(result).toBe(TaskPoolRunResult.RanOutOfCapacity);
|
||||
|
||||
expect((logger as jest.Mocked<Logger>).warn.mock.calls[0]).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"task pool run attempts exceeded 3; assuming ran out of capacity; availableWorkers: 0, tasksToRun: 0, leftOverTasks: 1, maxWorkers: 2, occupiedWorkers: 0, workerLoad: 0",
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
function mockRun() {
|
||||
return jest.fn(async () => {
|
||||
await sleep(0);
|
||||
|
|
|
@ -34,6 +34,7 @@ export enum TaskPoolRunResult {
|
|||
}
|
||||
|
||||
const VERSION_CONFLICT_MESSAGE = 'Task has been claimed by another Kibana service';
|
||||
const MAX_RUN_ATTEMPTS = 3;
|
||||
|
||||
/**
|
||||
* Runs tasks in batches, taking costs into account.
|
||||
|
@ -107,8 +108,27 @@ export class TaskPool {
|
|||
* @param {TaskRunner[]} tasks
|
||||
* @returns {Promise<boolean>}
|
||||
*/
|
||||
public run = async (tasks: TaskRunner[]): Promise<TaskPoolRunResult> => {
|
||||
const [tasksToRun, leftOverTasks] = partitionListByCount(tasks, this.availableWorkers);
|
||||
public async run(tasks: TaskRunner[], attempt = 1): Promise<TaskPoolRunResult> {
|
||||
// Note `this.availableWorkers` is a getter with side effects, so we just want
|
||||
// to call it once for this bit of the code.
|
||||
const availableWorkers = this.availableWorkers;
|
||||
const [tasksToRun, leftOverTasks] = partitionListByCount(tasks, availableWorkers);
|
||||
|
||||
if (attempt > MAX_RUN_ATTEMPTS) {
|
||||
const stats = [
|
||||
`availableWorkers: ${availableWorkers}`,
|
||||
`tasksToRun: ${tasksToRun.length}`,
|
||||
`leftOverTasks: ${leftOverTasks.length}`,
|
||||
`maxWorkers: ${this.maxWorkers}`,
|
||||
`occupiedWorkers: ${this.occupiedWorkers}`,
|
||||
`workerLoad: ${this.workerLoad}`,
|
||||
].join(', ');
|
||||
this.logger.warn(
|
||||
`task pool run attempts exceeded ${MAX_RUN_ATTEMPTS}; assuming ran out of capacity; ${stats}`
|
||||
);
|
||||
return TaskPoolRunResult.RanOutOfCapacity;
|
||||
}
|
||||
|
||||
if (tasksToRun.length) {
|
||||
await Promise.all(
|
||||
tasksToRun
|
||||
|
@ -144,14 +164,14 @@ export class TaskPool {
|
|||
|
||||
if (leftOverTasks.length) {
|
||||
if (this.availableWorkers) {
|
||||
return this.run(leftOverTasks);
|
||||
return this.run(leftOverTasks, attempt + 1);
|
||||
}
|
||||
return TaskPoolRunResult.RanOutOfCapacity;
|
||||
} else if (!this.availableWorkers) {
|
||||
return TaskPoolRunResult.RunningAtCapacity;
|
||||
}
|
||||
return TaskPoolRunResult.RunningAllClaimedTasks;
|
||||
};
|
||||
}
|
||||
|
||||
public cancelRunningTasks() {
|
||||
this.logger.debug('Cancelling running tasks.');
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue