mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
[Task Manager] improves cancelation messaging in Task Manager (#64075)
Instead of warning that a task isn't cancellable despite having been cancelled, we now do so as a debug warning. We now warn when a task has expired and is about to be cancelled including when it expired and how long it ran for.
This commit is contained in:
parent
58d56884c2
commit
dc5fb63cb9
4 changed files with 129 additions and 10 deletions
|
@ -9,6 +9,7 @@ import { TaskPool, TaskPoolRunResult } from './task_pool';
|
|||
import { mockLogger, resolvable, sleep } from './test_utils';
|
||||
import { asOk } from './lib/result_type';
|
||||
import { SavedObjectsErrorHelpers } from '../../../../src/core/server';
|
||||
import moment from 'moment';
|
||||
|
||||
describe('TaskPool', () => {
|
||||
test('occupiedWorkers are a sum of running tasks', async () => {
|
||||
|
@ -190,14 +191,16 @@ describe('TaskPool', () => {
|
|||
});
|
||||
|
||||
test('run cancels expired tasks prior to running new tasks', async () => {
|
||||
const logger = mockLogger();
|
||||
const pool = new TaskPool({
|
||||
maxWorkers: 2,
|
||||
logger: mockLogger(),
|
||||
logger,
|
||||
});
|
||||
|
||||
const expired = resolvable();
|
||||
const shouldRun = sinon.spy(() => Promise.resolve());
|
||||
const shouldNotRun = sinon.spy(() => Promise.resolve());
|
||||
const now = new Date();
|
||||
const result = await pool.run([
|
||||
{
|
||||
...mockTask(),
|
||||
|
@ -207,6 +210,16 @@ describe('TaskPool', () => {
|
|||
await sleep(10);
|
||||
return asOk({ state: {} });
|
||||
},
|
||||
get expiration() {
|
||||
return now;
|
||||
},
|
||||
get startedAt() {
|
||||
// 5 and a half minutes
|
||||
return moment(now)
|
||||
.subtract(5, 'm')
|
||||
.subtract(30, 's')
|
||||
.toDate();
|
||||
},
|
||||
cancel: shouldRun,
|
||||
},
|
||||
{
|
||||
|
@ -231,6 +244,10 @@ describe('TaskPool', () => {
|
|||
|
||||
expect(pool.occupiedWorkers).toEqual(2);
|
||||
expect(pool.availableWorkers).toEqual(0);
|
||||
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
`Cancelling task TaskType "shooooo" as it expired at ${now.toISOString()} after running for 05m 30s (with timeout set at 5m).`
|
||||
);
|
||||
});
|
||||
|
||||
test('logs if cancellation errors', async () => {
|
||||
|
@ -285,6 +302,20 @@ describe('TaskPool', () => {
|
|||
markTaskAsRunning: jest.fn(async () => true),
|
||||
run: mockRun(),
|
||||
toString: () => `TaskType "shooooo"`,
|
||||
get expiration() {
|
||||
return new Date();
|
||||
},
|
||||
get startedAt() {
|
||||
return new Date();
|
||||
},
|
||||
get definition() {
|
||||
return {
|
||||
type: '',
|
||||
title: '',
|
||||
timeout: '5m',
|
||||
createTaskRunner: jest.fn(),
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
});
|
||||
|
|
|
@ -8,7 +8,9 @@
|
|||
* This module contains the logic that ensures we don't run too many
|
||||
* tasks at once in a given Kibana instance.
|
||||
*/
|
||||
import moment, { Duration } from 'moment';
|
||||
import { performance } from 'perf_hooks';
|
||||
import { padLeft } from 'lodash';
|
||||
import { Logger } from './types';
|
||||
import { TaskRunner } from './task_runner';
|
||||
import { isTaskSavedObjectNotFoundError } from './lib/is_task_not_found_error';
|
||||
|
@ -148,7 +150,19 @@ export class TaskPool {
|
|||
private cancelExpiredTasks() {
|
||||
for (const task of this.running) {
|
||||
if (task.isExpired) {
|
||||
this.logger.debug(`Cancelling expired task ${task.toString()}.`);
|
||||
this.logger.warn(
|
||||
`Cancelling task ${task.toString()} as it expired at ${task.expiration.toISOString()}${
|
||||
task.startedAt
|
||||
? ` after running for ${durationAsString(
|
||||
moment.duration(
|
||||
moment(new Date())
|
||||
.utc()
|
||||
.diff(task.startedAt)
|
||||
)
|
||||
)}`
|
||||
: ``
|
||||
}${task.definition.timeout ? ` (with timeout set at ${task.definition.timeout})` : ``}.`
|
||||
);
|
||||
this.cancelTask(task);
|
||||
}
|
||||
}
|
||||
|
@ -169,3 +183,8 @@ function partitionListByCount<T>(list: T[], count: number): [T[], T[]] {
|
|||
const listInCount = list.splice(0, count);
|
||||
return [listInCount, list];
|
||||
}
|
||||
|
||||
function durationAsString(duration: Duration): string {
|
||||
const [m, s] = [duration.minutes(), duration.seconds()].map(value => padLeft(`${value}`, 2, '0'));
|
||||
return `${m}m ${s}s`;
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import { ConcreteTaskInstance, TaskStatus } from './task';
|
|||
import { TaskManagerRunner } from './task_runner';
|
||||
import { mockLogger } from './test_utils';
|
||||
import { SavedObjectsErrorHelpers } from '../../../../src/core/server';
|
||||
import moment from 'moment';
|
||||
|
||||
let fakeTimer: sinon.SinonFakeTimers;
|
||||
|
||||
|
@ -113,6 +114,60 @@ describe('TaskManagerRunner', () => {
|
|||
expect(instance.runAt.getTime()).toBeLessThanOrEqual(minutesFromNow(10).getTime());
|
||||
});
|
||||
|
||||
test('expiration returns time after which timeout will have elapsed from start', async () => {
|
||||
const now = moment();
|
||||
const { runner } = testOpts({
|
||||
instance: {
|
||||
schedule: { interval: '10m' },
|
||||
status: TaskStatus.Running,
|
||||
startedAt: now.toDate(),
|
||||
},
|
||||
definitions: {
|
||||
bar: {
|
||||
timeout: `1m`,
|
||||
createTaskRunner: () => ({
|
||||
async run() {
|
||||
return;
|
||||
},
|
||||
}),
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await runner.run();
|
||||
|
||||
expect(runner.isExpired).toBe(false);
|
||||
expect(runner.expiration).toEqual(now.add(1, 'm').toDate());
|
||||
});
|
||||
|
||||
test('runDuration returns duration which has elapsed since start', async () => {
|
||||
const now = moment()
|
||||
.subtract(30, 's')
|
||||
.toDate();
|
||||
const { runner } = testOpts({
|
||||
instance: {
|
||||
schedule: { interval: '10m' },
|
||||
status: TaskStatus.Running,
|
||||
startedAt: now,
|
||||
},
|
||||
definitions: {
|
||||
bar: {
|
||||
timeout: `1m`,
|
||||
createTaskRunner: () => ({
|
||||
async run() {
|
||||
return;
|
||||
},
|
||||
}),
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await runner.run();
|
||||
|
||||
expect(runner.isExpired).toBe(false);
|
||||
expect(runner.startedAt).toEqual(now);
|
||||
});
|
||||
|
||||
test('reschedules tasks that return a runAt', async () => {
|
||||
const runAt = minutesFromNow(_.random(1, 10));
|
||||
const { runner, store } = testOpts({
|
||||
|
@ -208,7 +263,7 @@ describe('TaskManagerRunner', () => {
|
|||
expect(logger.warn).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test('warns if cancel is called on a non-cancellable task', async () => {
|
||||
test('debug logs if cancel is called on a non-cancellable task', async () => {
|
||||
const { runner, logger } = testOpts({
|
||||
definitions: {
|
||||
bar: {
|
||||
|
@ -223,10 +278,7 @@ describe('TaskManagerRunner', () => {
|
|||
await runner.cancel();
|
||||
await promise;
|
||||
|
||||
expect(logger.warn).toHaveBeenCalledTimes(1);
|
||||
expect(logger.warn.mock.calls[0][0]).toMatchInlineSnapshot(
|
||||
`"The task bar \\"foo\\" is not cancellable."`
|
||||
);
|
||||
expect(logger.debug).toHaveBeenCalledWith(`The task bar "foo" is not cancellable.`);
|
||||
});
|
||||
|
||||
test('sets startedAt, status, attempts and retryAt when claiming a task', async () => {
|
||||
|
|
|
@ -39,6 +39,9 @@ const EMPTY_RUN_RESULT: SuccessfulRunResult = {};
|
|||
|
||||
export interface TaskRunner {
|
||||
isExpired: boolean;
|
||||
expiration: Date;
|
||||
startedAt: Date | null;
|
||||
definition: TaskDefinition;
|
||||
cancel: CancelFunction;
|
||||
markTaskAsRunning: () => Promise<boolean>;
|
||||
run: () => Promise<Result<SuccessfulRunResult, FailedRunResult>>;
|
||||
|
@ -129,11 +132,25 @@ export class TaskManagerRunner implements TaskRunner {
|
|||
return this.definitions[this.taskType];
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the time at which this task will expire.
|
||||
*/
|
||||
public get expiration() {
|
||||
return intervalFromDate(this.instance.startedAt!, this.definition.timeout)!;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the duration of the current task run
|
||||
*/
|
||||
public get startedAt() {
|
||||
return this.instance.startedAt;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets whether or not this task has run longer than its expiration setting allows.
|
||||
*/
|
||||
public get isExpired() {
|
||||
return intervalFromDate(this.instance.startedAt!, this.definition.timeout)! < new Date();
|
||||
return this.expiration < new Date();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -261,12 +278,12 @@ export class TaskManagerRunner implements TaskRunner {
|
|||
*/
|
||||
public async cancel() {
|
||||
const { task } = this;
|
||||
if (task && task.cancel) {
|
||||
if (task?.cancel) {
|
||||
this.task = undefined;
|
||||
return task.cancel();
|
||||
}
|
||||
|
||||
this.logger.warn(`The task ${this} is not cancellable.`);
|
||||
this.logger.debug(`The task ${this} is not cancellable.`);
|
||||
}
|
||||
|
||||
private validateResult(result?: RunResult | void): Result<SuccessfulRunResult, FailedRunResult> {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue