Improves performance of task execution in Task manager (#50047)

This PR include three key changes:

1. Run tasks as soon as they have been marked as running, rather than wait for the whole batch to me marked
2. Use a custom refresh setting of refresh: false where possible, in place of wait_for, in order to speed up Task Manager's internal workflow
3. Instrumentation of Task Manager exposing Activity / Inactivity metrics in Performance test runs
This commit is contained in:
Gidi Meir Morris 2019-11-06 13:48:19 -05:00 committed by GitHub
parent 92b540d163
commit 3281a6833e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 895 additions and 479 deletions

View file

@ -18,6 +18,25 @@
*/
module.exports = (_, options = {}) => {
const overrides = [];
if (!process.env.ALLOW_PERFORMANCE_HOOKS_IN_TASK_MANAGER) {
overrides.push(
{
test: [/x-pack[\/\\]legacy[\/\\]plugins[\/\\]task_manager/],
plugins: [
[
'filter-imports',
{
imports: {
perf_hooks: ['performance'],
},
},
],
],
}
);
}
return {
presets: [
[
@ -39,7 +58,7 @@ module.exports = (_, options = {}) => {
modules: 'cjs',
corejs: 3,
...(options['@babel/preset-env'] || {})
...(options['@babel/preset-env'] || {}),
},
],
require('./common_preset'),
@ -48,9 +67,10 @@ module.exports = (_, options = {}) => {
[
require.resolve('babel-plugin-transform-define'),
{
'global.__BUILT_WITH_BABEL__': 'true'
}
]
]
'global.__BUILT_WITH_BABEL__': 'true',
},
],
],
overrides,
};
};

View file

@ -8,10 +8,11 @@
"@babel/plugin-syntax-dynamic-import": "^7.2.0",
"@babel/plugin-transform-modules-commonjs": "^7.5.0",
"@babel/preset-env": "^7.5.5",
"@babel/preset-react":"^7.0.0",
"@babel/preset-react": "^7.0.0",
"@babel/preset-typescript": "^7.3.3",
"@kbn/elastic-idx": "1.0.0",
"babel-plugin-add-module-exports": "^1.0.2",
"babel-plugin-filter-imports": "^3.0.0",
"babel-plugin-transform-define": "^1.3.1",
"babel-plugin-typescript-strip-namespaces": "^1.1.1"
}

View file

@ -34,6 +34,7 @@
"xpack.server": "legacy/server",
"xpack.snapshotRestore": "legacy/plugins/snapshot_restore",
"xpack.spaces": ["legacy/plugins/spaces", "plugins/spaces"],
"xpack.taskManager": "legacy/plugins/task_manager",
"xpack.transform": "legacy/plugins/transform",
"xpack.upgradeAssistant": "legacy/plugins/upgrade_assistant",
"xpack.uptime": "legacy/plugins/uptime",

View file

@ -8,16 +8,15 @@
// Which is basically the Haskel equivalent of Rust/ML/Scala's Result
// I'll reach out to other's in Kibana to see if we can merge these into one type
// eslint-disable-next-line @typescript-eslint/consistent-type-definitions
export type Ok<T> = {
export interface Ok<T> {
tag: 'ok';
value: T;
};
// eslint-disable-next-line @typescript-eslint/consistent-type-definitions
export type Err<E> = {
}
export interface Err<E> {
tag: 'err';
error: E;
};
}
export type Result<T, E> = Ok<T> | Err<E>;
export function asOk<T>(value: T): Ok<T> {

View file

@ -7,13 +7,14 @@
import _ from 'lodash';
import sinon from 'sinon';
import { fillPool } from './fill_pool';
import { TaskPoolRunResult } from '../task_pool';
describe('fillPool', () => {
test('stops filling when there are no more tasks in the store', async () => {
const tasks = [[1, 2, 3], [4, 5]];
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
const run = sinon.spy(async () => true);
const run = sinon.spy(async () => TaskPoolRunResult.RunningAllClaimedTasks);
const converter = _.identity;
await fillPool(run, fetchAvailableTasks, converter);
@ -25,7 +26,7 @@ describe('fillPool', () => {
const tasks = [[1, 2, 3], [4, 5]];
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
const run = sinon.spy(async () => false);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = _.identity;
await fillPool(run, fetchAvailableTasks, converter);
@ -37,7 +38,7 @@ describe('fillPool', () => {
const tasks = [[1, 2, 3], [4, 5]];
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
const run = sinon.spy(async () => false);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = (x: number) => x.toString();
await fillPool(run, fetchAvailableTasks, converter);
@ -47,7 +48,7 @@ describe('fillPool', () => {
describe('error handling', () => {
test('throws exception from fetchAvailableTasks', async () => {
const run = sinon.spy(async () => false);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = (x: number) => x.toString();
try {
@ -80,7 +81,7 @@ describe('fillPool', () => {
const tasks = [[1, 2, 3], [4, 5]];
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
const run = sinon.spy(async () => false);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = (x: number) => {
throw new Error(`can not convert ${x}`);
};

View file

@ -4,7 +4,15 @@
* you may not use this file except in compliance with the Elastic License.
*/
type BatchRun<T> = (tasks: T[]) => Promise<boolean>;
import { performance } from 'perf_hooks';
import { TaskPoolRunResult } from '../task_pool';
export enum FillPoolResult {
NoTasksClaimed = 'NoTasksClaimed',
RanOutOfCapacity = 'RanOutOfCapacity',
}
type BatchRun<T> = (tasks: T[]) => Promise<TaskPoolRunResult>;
type Fetcher<T> = () => Promise<T[]>;
type Converter<T1, T2> = (t: T1) => T2;
@ -24,18 +32,32 @@ export async function fillPool<TRecord, TRunner>(
run: BatchRun<TRunner>,
fetchAvailableTasks: Fetcher<TRecord>,
converter: Converter<TRecord, TRunner>
): Promise<void> {
): Promise<FillPoolResult> {
performance.mark('fillPool.start');
while (true) {
const instances = await fetchAvailableTasks();
if (!instances.length) {
return;
performance.mark('fillPool.bailNoTasks');
performance.measure(
'fillPool.activityDurationUntilNoTasks',
'fillPool.start',
'fillPool.bailNoTasks'
);
return FillPoolResult.NoTasksClaimed;
}
const tasks = instances.map(converter);
if (!(await run(tasks))) {
return;
if ((await run(tasks)) === TaskPoolRunResult.RanOutOfCapacity) {
performance.mark('fillPool.bailExhaustedCapacity');
performance.measure(
'fillPool.activityDurationUntilExhaustedCapacity',
'fillPool.start',
'fillPool.bailExhaustedCapacity'
);
return FillPoolResult.RanOutOfCapacity;
}
performance.mark('fillPool.cycle');
}
}

View file

@ -70,6 +70,7 @@ describe('addMiddlewareToChain', () => {
return opts;
},
beforeRun: defaultBeforeRun,
beforeMarkRunning: defaultBeforeRun,
};
const m2 = {
beforeSave: async (opts: BeforeSaveOpts) => {
@ -77,6 +78,7 @@ describe('addMiddlewareToChain', () => {
return opts;
},
beforeRun: defaultBeforeRun,
beforeMarkRunning: defaultBeforeRun,
};
const m3 = {
beforeSave: async (opts: BeforeSaveOpts) => {
@ -84,6 +86,7 @@ describe('addMiddlewareToChain', () => {
return opts;
},
beforeRun: defaultBeforeRun,
beforeMarkRunning: defaultBeforeRun,
};
let middlewareChain;
@ -119,6 +122,7 @@ describe('addMiddlewareToChain', () => {
m1: true,
};
},
beforeMarkRunning: defaultBeforeRun,
};
const m2 = {
beforeSave: defaultBeforeSave,
@ -128,6 +132,7 @@ describe('addMiddlewareToChain', () => {
m2: true,
};
},
beforeMarkRunning: defaultBeforeRun,
};
const m3 = {
beforeSave: defaultBeforeSave,
@ -137,6 +142,7 @@ describe('addMiddlewareToChain', () => {
m3: true,
};
},
beforeMarkRunning: defaultBeforeRun,
};
let middlewareChain;

View file

@ -23,10 +23,12 @@ export type BeforeSaveFunction = (
) => Promise<BeforeSaveMiddlewareParams>;
export type BeforeRunFunction = (params: RunContext) => Promise<RunContext>;
export type BeforeMarkRunningFunction = (params: RunContext) => Promise<RunContext>;
export interface Middleware {
beforeSave: BeforeSaveFunction;
beforeRun: BeforeRunFunction;
beforeMarkRunning: BeforeMarkRunningFunction;
}
export function addMiddlewareToChain(prevMiddleware: Middleware, middleware: Middleware) {
@ -39,8 +41,14 @@ export function addMiddlewareToChain(prevMiddleware: Middleware, middleware: Mid
? (params: RunContext) => middleware.beforeRun(params).then(prevMiddleware.beforeRun)
: prevMiddleware.beforeRun;
const beforeMarkRunning = middleware.beforeMarkRunning
? (params: RunContext) =>
middleware.beforeMarkRunning(params).then(prevMiddleware.beforeMarkRunning)
: prevMiddleware.beforeMarkRunning;
return {
beforeSave,
beforeRun,
beforeMarkRunning,
};
}

View file

@ -174,6 +174,7 @@ describe('TaskManager', () => {
const middleware = {
beforeSave: async (saveOpts: any) => saveOpts,
beforeRun: async (runOpts: any) => runOpts,
beforeMarkRunning: async (runOpts: any) => runOpts,
};
expect(() => client.addMiddleware(middleware)).not.toThrow();
});
@ -183,6 +184,7 @@ describe('TaskManager', () => {
const middleware = {
beforeSave: async (saveOpts: any) => saveOpts,
beforeRun: async (runOpts: any) => runOpts,
beforeMarkRunning: async (runOpts: any) => runOpts,
};
client.start();
@ -241,7 +243,10 @@ describe('TaskManager', () => {
claimAvailableTasks(claim, 10, logger);
sinon.assert.calledWithMatch(logger.warn, /inline scripts/);
expect(logger.warn).toHaveBeenCalledTimes(1);
expect(logger.warn.mock.calls[0][0]).toMatchInlineSnapshot(
`"Task Manager cannot operate when inline scripts are disabled in Elasticsearch"`
);
});
});
});

View file

@ -3,10 +3,10 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { performance } from 'perf_hooks';
import { SavedObjectsClientContract, SavedObjectsSerializer } from 'src/core/server';
import { Logger } from './types';
import { fillPool } from './lib/fill_pool';
import { fillPool, FillPoolResult } from './lib/fill_pool';
import { addMiddlewareToChain, BeforeSaveMiddlewareParams, Middleware } from './lib/middleware';
import { sanitizeTaskDefinitions } from './lib/sanitize_task_definitions';
import { intervalFromNow } from './lib/intervals';
@ -56,13 +56,14 @@ export class TaskManager {
private readonly pollerInterval: number;
private definitions: TaskDictionary<TaskDefinition>;
private store: TaskStore;
private poller: TaskPoller;
private poller: TaskPoller<FillPoolResult>;
private logger: Logger;
private pool: TaskPool;
private startQueue: Array<() => void> = [];
private middleware = {
beforeSave: async (saveOpts: BeforeSaveMiddlewareParams) => saveOpts,
beforeRun: async (runOpts: RunContext) => runOpts,
beforeMarkRunning: async (runOpts: RunContext) => runOpts,
};
/**
@ -86,8 +87,6 @@ export class TaskManager {
this.logger.info(`TaskManager is identified by the Kibana UUID: ${taskManagerId}`);
}
/* Kibana UUID needs to be pulled live (not cached), as it takes a long time
* to initialize, and can change after startup */
const store = new TaskStore({
serializer: opts.serializer,
savedObjectsRepository: opts.savedObjectsRepository,
@ -109,13 +108,14 @@ export class TaskManager {
store,
definitions: this.definitions,
beforeRun: this.middleware.beforeRun,
beforeMarkRunning: this.middleware.beforeMarkRunning,
});
const poller = new TaskPoller({
const poller = new TaskPoller<FillPoolResult>({
logger: this.logger,
pollInterval: opts.config.get('xpack.task_manager.poll_interval'),
work: (): Promise<void> =>
work: (): Promise<FillPoolResult> =>
fillPool(
pool.run,
async tasks => await pool.run(tasks),
() =>
claimAvailableTasks(
this.store.claimAvailableTasks.bind(this.store),
@ -260,12 +260,24 @@ export async function claimAvailableTasks(
logger: Logger
) {
if (availableWorkers > 0) {
performance.mark('claimAvailableTasks_start');
try {
const { docs, claimedTasks } = await claim({
size: availableWorkers,
claimOwnershipUntil: intervalFromNow('30s')!,
});
if (claimedTasks === 0) {
performance.mark('claimAvailableTasks.noTasks');
}
performance.mark('claimAvailableTasks_stop');
performance.measure(
'claimAvailableTasks',
'claimAvailableTasks_start',
'claimAvailableTasks_stop'
);
if (docs.length !== claimedTasks) {
logger.warn(
`[Task Ownership error]: (${claimedTasks}) tasks were claimed by Kibana, but (${docs.length}) tasks were fetched`
@ -282,6 +294,7 @@ export async function claimAvailableTasks(
}
}
} else {
performance.mark('claimAvailableTasks.noAvailableWorkers');
logger.info(
`[Task Ownership]: Task Manager has skipped Claiming Ownership of available tasks at it has ran out Available Workers. If this happens often, consider adjusting the "xpack.task_manager.max_workers" configuration.`
);

View file

@ -73,7 +73,9 @@ describe('TaskPoller', () => {
await doneWorking;
expect(count).toEqual(2);
sinon.assert.calledWithMatch(logger.error, /Dang it/i);
expect(logger.error.mock.calls[0][0]).toMatchInlineSnapshot(
`"Failed to poll for work: Error: Dang it!"`
);
});
test('is stoppable', async () => {

View file

@ -8,27 +8,28 @@
* This module contains the logic for polling the task manager index for new work.
*/
import { performance } from 'perf_hooks';
import { Logger } from './types';
type WorkFn = () => Promise<void>;
type WorkFn<T> = () => Promise<T>;
interface Opts {
interface Opts<T> {
pollInterval: number;
logger: Logger;
work: WorkFn;
work: WorkFn<T>;
}
/**
* Performs work on a scheduled interval, logging any errors. This waits for work to complete
* (or error) prior to attempting another run.
*/
export class TaskPoller {
export class TaskPoller<T> {
private isStarted = false;
private isWorking = false;
private timeout: any;
private pollInterval: number;
private logger: Logger;
private work: WorkFn;
private work: WorkFn<T>;
/**
* Constructs a new TaskPoller.
@ -38,7 +39,7 @@ export class TaskPoller {
* @prop {Logger} logger - The task manager logger
* @prop {WorkFn} work - An empty, asynchronous function that performs the desired work
*/
constructor(opts: Opts) {
constructor(opts: Opts<T>) {
this.pollInterval = opts.pollInterval;
this.logger = opts.logger;
this.work = opts.work;
@ -57,8 +58,16 @@ export class TaskPoller {
const poll = async () => {
await this.attemptWork();
performance.mark('TaskPoller.sleep');
if (this.isStarted) {
this.timeout = setTimeout(poll, this.pollInterval);
this.timeout = setTimeout(
tryAndLogOnError(() => {
performance.mark('TaskPoller.poll');
performance.measure('TaskPoller.sleepDuration', 'TaskPoller.sleep', 'TaskPoller.poll');
poll();
}, this.logger),
this.pollInterval
);
}
};
@ -94,3 +103,13 @@ export class TaskPoller {
}
}
}
function tryAndLogOnError(fn: Function, logger: Logger): Function {
return () => {
try {
fn();
} catch (err) {
logger.error(`Task Poller polling phase failed: ${err}`);
}
};
}

View file

@ -5,7 +5,7 @@
*/
import sinon from 'sinon';
import { TaskPool } from './task_pool';
import { TaskPool, TaskPoolRunResult } from './task_pool';
import { mockLogger, resolvable, sleep } from './test_utils';
describe('TaskPool', () => {
@ -17,7 +17,7 @@ describe('TaskPool', () => {
const result = await pool.run([{ ...mockTask() }, { ...mockTask() }, { ...mockTask() }]);
expect(result).toBeTruthy();
expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks);
expect(pool.occupiedWorkers).toEqual(3);
});
@ -29,7 +29,7 @@ describe('TaskPool', () => {
const result = await pool.run([{ ...mockTask() }, { ...mockTask() }, { ...mockTask() }]);
expect(result).toBeTruthy();
expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks);
expect(pool.availableWorkers).toEqual(7);
});
@ -48,10 +48,74 @@ describe('TaskPool', () => {
{ ...mockTask(), run: shouldNotRun },
]);
expect(result).toBeFalsy();
expect(result).toEqual(TaskPoolRunResult.RanOutOfCapacity);
expect(pool.availableWorkers).toEqual(0);
sinon.assert.calledTwice(shouldRun);
sinon.assert.notCalled(shouldNotRun);
expect(shouldRun).toHaveBeenCalledTimes(2);
expect(shouldNotRun).not.toHaveBeenCalled();
});
test('should log when marking a Task as running fails', async () => {
const logger = mockLogger();
const pool = new TaskPool({
maxWorkers: 2,
logger,
});
const taskFailedToMarkAsRunning = mockTask();
taskFailedToMarkAsRunning.markTaskAsRunning.mockImplementation(async () => {
throw new Error(`Mark Task as running has failed miserably`);
});
const result = await pool.run([mockTask(), taskFailedToMarkAsRunning, mockTask()]);
expect(logger.error.mock.calls[0]).toMatchInlineSnapshot(`
Array [
"Failed to mark Task TaskType \\"shooooo\\" as running: Mark Task as running has failed miserably",
]
`);
expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks);
});
test('should log when running a Task fails', async () => {
const logger = mockLogger();
const pool = new TaskPool({
maxWorkers: 3,
logger,
});
const taskFailedToRun = mockTask();
taskFailedToRun.run.mockImplementation(async () => {
throw new Error(`Run Task has failed miserably`);
});
const result = await pool.run([mockTask(), taskFailedToRun, mockTask()]);
expect(logger.warn.mock.calls[0]).toMatchInlineSnapshot(`
Array [
"Task TaskType \\"shooooo\\" failed in attempt to run: Run Task has failed miserably",
]
`);
expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks);
});
test('Running a task which fails still takes up capacity', async () => {
const logger = mockLogger();
const pool = new TaskPool({
maxWorkers: 1,
logger,
});
const taskFailedToRun = mockTask();
taskFailedToRun.run.mockImplementation(async () => {
await sleep(0);
throw new Error(`Run Task has failed miserably`);
});
const result = await pool.run([taskFailedToRun, mockTask()]);
expect(result).toEqual(TaskPoolRunResult.RanOutOfCapacity);
});
test('clears up capacity when a task completes', async () => {
@ -78,7 +142,7 @@ describe('TaskPool', () => {
{ ...mockTask(), run: secondRun },
]);
expect(result).toBeFalsy();
expect(result).toEqual(TaskPoolRunResult.RanOutOfCapacity);
expect(pool.occupiedWorkers).toEqual(1);
expect(pool.availableWorkers).toEqual(0);
@ -133,7 +197,7 @@ describe('TaskPool', () => {
},
]);
expect(result).toBeTruthy();
expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks);
expect(pool.occupiedWorkers).toEqual(2);
expect(pool.availableWorkers).toEqual(0);
@ -173,7 +237,7 @@ describe('TaskPool', () => {
},
]);
expect(result).toBeTruthy();
expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks);
await pool.run([]);
expect(pool.occupiedWorkers).toEqual(0);
@ -181,11 +245,13 @@ describe('TaskPool', () => {
// Allow the task to cancel...
await cancelled;
sinon.assert.calledWithMatch(logger.error, /Failed to cancel task "shooooo!"/);
expect(logger.error.mock.calls[0][0]).toMatchInlineSnapshot(
`"Failed to cancel task \\"shooooo!\\": Error: Dern!"`
);
});
function mockRun() {
return sinon.spy(async () => {
return jest.fn(async () => {
await sleep(0);
return { state: {} };
});
@ -195,8 +261,9 @@ describe('TaskPool', () => {
return {
isExpired: false,
cancel: async () => undefined,
markTaskAsRunning: async () => true,
markTaskAsRunning: jest.fn(async () => true),
run: mockRun(),
toString: () => `TaskType "shooooo"`,
};
}
});

View file

@ -8,7 +8,7 @@
* This module contains the logic that ensures we don't run too many
* tasks at once in a given Kibana instance.
*/
import { performance } from 'perf_hooks';
import { Logger } from './types';
import { TaskRunner } from './task_runner';
@ -17,6 +17,13 @@ interface Opts {
logger: Logger;
}
export enum TaskPoolRunResult {
RunningAllClaimedTasks = 'RunningAllClaimedTasks',
RanOutOfCapacity = 'RanOutOfCapacity',
}
const VERSION_CONFLICT_MESSAGE = 'Task has been claimed by another Kibana service';
/**
* Runs tasks in batches, taking costs into account.
*/
@ -66,38 +73,64 @@ export class TaskPool {
};
public cancelRunningTasks() {
this.logger.debug(`Cancelling running tasks.`);
this.logger.debug('Cancelling running tasks.');
for (const task of this.running) {
this.cancelTask(task);
}
}
private async attemptToRun(tasks: TaskRunner[]) {
for (const task of tasks) {
if (this.availableWorkers > 0) {
if (await task.markTaskAsRunning()) {
this.running.add(task);
task
.run()
.catch(err => {
this.logger.warn(`Task ${task} failed in attempt to run: ${err.message}`);
})
.then(() => this.running.delete(task));
} else {
this.logger.warn(`Failed to mark Task ${task} as running`);
}
} else {
return false;
}
private async attemptToRun(tasks: TaskRunner[]): Promise<TaskPoolRunResult> {
const [tasksToRun, leftOverTasks] = partitionListByCount(tasks, this.availableWorkers);
if (tasksToRun.length) {
performance.mark('attemptToRun_start');
await Promise.all(
tasksToRun.map(
async task =>
await task
.markTaskAsRunning()
.then((hasTaskBeenMarkAsRunning: boolean) =>
hasTaskBeenMarkAsRunning
? this.handleMarkAsRunning(task)
: this.handleFailureOfMarkAsRunning(task, {
name: 'TaskPoolVersionConflictError',
message: VERSION_CONFLICT_MESSAGE,
})
)
.catch(ex => this.handleFailureOfMarkAsRunning(task, ex))
)
);
performance.mark('attemptToRun_stop');
performance.measure('taskPool.attemptToRun', 'attemptToRun_start', 'attemptToRun_stop');
}
return true;
if (leftOverTasks.length) {
if (this.availableWorkers) {
return this.attemptToRun(leftOverTasks);
}
return TaskPoolRunResult.RanOutOfCapacity;
}
return TaskPoolRunResult.RunningAllClaimedTasks;
}
private handleMarkAsRunning(task: TaskRunner) {
this.running.add(task);
task
.run()
.catch(err => {
this.logger.warn(`Task ${task.toString()} failed in attempt to run: ${err.message}`);
})
.then(() => this.running.delete(task));
}
private handleFailureOfMarkAsRunning(task: TaskRunner, err: Error) {
this.logger.error(`Failed to mark Task ${task.toString()} as running: ${err.message}`);
}
private cancelExpiredTasks() {
for (const task of this.running) {
if (task.isExpired) {
this.logger.debug(`Cancelling expired task ${task}.`);
this.logger.debug(`Cancelling expired task ${task.toString()}.`);
this.cancelTask(task);
}
}
@ -105,11 +138,16 @@ export class TaskPool {
private async cancelTask(task: TaskRunner) {
try {
this.logger.debug(`Cancelling task ${task}.`);
this.logger.debug(`Cancelling task ${task.toString()}.`);
this.running.delete(task);
await task.cancel();
} catch (err) {
this.logger.error(`Failed to cancel task ${task}: ${err}`);
this.logger.error(`Failed to cancel task ${task.toString()}: ${err}`);
}
}
}
function partitionListByCount<T>(list: T[], count: number): [T[], T[]] {
const listInCount = list.splice(0, count);
return [listInCount, list];
}

View file

@ -10,6 +10,7 @@ import { minutesFromNow } from './lib/intervals';
import { ConcreteTaskInstance } from './task';
import { TaskManagerRunner } from './task_runner';
import { mockLogger } from './test_utils';
import { SavedObjectsErrorHelpers } from '../../../../src/core/server/saved_objects/service/lib/errors';
let fakeTimer: sinon.SinonFakeTimers;
@ -202,7 +203,7 @@ describe('TaskManagerRunner', () => {
await promise;
expect(wasCancelled).toBeTruthy();
sinon.assert.neverCalledWithMatch(logger.warn, /not cancellable/);
expect(logger.warn).not.toHaveBeenCalled();
});
test('warns if cancel is called on a non-cancellable task', async () => {
@ -220,7 +221,10 @@ describe('TaskManagerRunner', () => {
await runner.cancel();
await promise;
sinon.assert.calledWithMatch(logger.warn, /not cancellable/);
expect(logger.warn).toHaveBeenCalledTimes(1);
expect(logger.warn.mock.calls[0][0]).toMatchInlineSnapshot(
`"The task bar \\"foo\\" is not cancellable."`
);
});
test('sets startedAt, status, attempts and retryAt when claiming a task', async () => {
@ -420,6 +424,74 @@ describe('TaskManagerRunner', () => {
);
});
test('it returns false when markTaskAsRunning fails due to VERSION_CONFLICT_STATUS', async () => {
const id = _.random(1, 20).toString();
const initialAttempts = _.random(1, 3);
const nextRetry = new Date(Date.now() + _.random(15, 100) * 1000);
const timeoutMinutes = 1;
const getRetryStub = sinon.stub().returns(nextRetry);
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
interval: undefined,
},
definitions: {
bar: {
timeout: `${timeoutMinutes}m`,
getRetry: getRetryStub,
createTaskRunner: () => ({
run: async () => undefined,
}),
},
},
});
store.update = sinon
.stub()
.throws(
SavedObjectsErrorHelpers.decorateConflictError(new Error('repo error')).output.payload
);
expect(await runner.markTaskAsRunning()).toEqual(false);
});
test('it throw when markTaskAsRunning fails for unexpected reasons', async () => {
const id = _.random(1, 20).toString();
const initialAttempts = _.random(1, 3);
const nextRetry = new Date(Date.now() + _.random(15, 100) * 1000);
const timeoutMinutes = 1;
const getRetryStub = sinon.stub().returns(nextRetry);
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
interval: undefined,
},
definitions: {
bar: {
timeout: `${timeoutMinutes}m`,
getRetry: getRetryStub,
createTaskRunner: () => ({
run: async () => undefined,
}),
},
},
});
store.update = sinon
.stub()
.throws(SavedObjectsErrorHelpers.createGenericNotFoundError('type', 'id').output.payload);
return expect(runner.markTaskAsRunning()).rejects.toMatchInlineSnapshot(`
Object {
"error": "Not Found",
"message": "Saved object [type/id] not found",
"statusCode": 404,
}
`);
});
test('uses getRetry (returning true) to set retryAt when defined', async () => {
const id = _.random(1, 20).toString();
const initialAttempts = _.random(1, 3);
@ -601,6 +673,7 @@ describe('TaskManagerRunner', () => {
};
const runner = new TaskManagerRunner({
beforeRun: context => Promise.resolve(context),
beforeMarkRunning: context => Promise.resolve(context),
logger,
store,
instance: Object.assign(
@ -655,9 +728,10 @@ describe('TaskManagerRunner', () => {
await runner.run();
if (shouldBeValid) {
sinon.assert.notCalled(logger.warn);
expect(logger.warn).not.toHaveBeenCalled();
} else {
sinon.assert.calledWith(logger.warn, sinon.match(/invalid task result/i));
expect(logger.warn).toHaveBeenCalledTimes(1);
expect(logger.warn.mock.calls[0][0]).toMatch(/invalid task result/i);
}
}

View file

@ -10,10 +10,11 @@
* rescheduling, middleware application, etc.
*/
import { performance } from 'perf_hooks';
import Joi from 'joi';
import { intervalFromDate, intervalFromNow } from './lib/intervals';
import { Logger } from './types';
import { BeforeRunFunction } from './lib/middleware';
import { BeforeRunFunction, BeforeMarkRunningFunction } from './lib/middleware';
import {
CancelFunction,
CancellableTask,
@ -32,7 +33,7 @@ export interface TaskRunner {
cancel: CancelFunction;
markTaskAsRunning: () => Promise<boolean>;
run: () => Promise<RunResult>;
toString?: () => string;
toString: () => string;
}
interface Updatable {
@ -47,6 +48,7 @@ interface Opts {
instance: ConcreteTaskInstance;
store: Updatable;
beforeRun: BeforeRunFunction;
beforeMarkRunning: BeforeMarkRunningFunction;
}
/**
@ -62,8 +64,9 @@ export class TaskManagerRunner implements TaskRunner {
private instance: ConcreteTaskInstance;
private definitions: TaskDictionary<TaskDefinition>;
private logger: Logger;
private store: Updatable;
private bufferedTaskStore: Updatable;
private beforeRun: BeforeRunFunction;
private beforeMarkRunning: BeforeMarkRunningFunction;
/**
* Creates an instance of TaskManagerRunner.
@ -79,8 +82,9 @@ export class TaskManagerRunner implements TaskRunner {
this.instance = sanitizeInstance(opts.instance);
this.definitions = opts.definitions;
this.logger = opts.logger;
this.store = opts.store;
this.bufferedTaskStore = opts.store;
this.beforeRun = opts.beforeRun;
this.beforeMarkRunning = opts.beforeMarkRunning;
}
/**
@ -153,14 +157,20 @@ export class TaskManagerRunner implements TaskRunner {
* @returns {Promise<boolean>}
*/
public async markTaskAsRunning(): Promise<boolean> {
performance.mark('markTaskAsRunning_start');
const VERSION_CONFLICT_STATUS = 409;
const attempts = this.instance.attempts + 1;
const now = new Date();
const ownershipClaimedUntil = this.instance.retryAt;
const { taskInstance } = await this.beforeMarkRunning({
taskInstance: this.instance,
});
const attempts = taskInstance.attempts + 1;
const ownershipClaimedUntil = taskInstance.retryAt;
try {
const { id } = this.instance;
const { id } = taskInstance;
const timeUntilClaimExpires = howManyMsUntilOwnershipClaimExpires(ownershipClaimedUntil);
if (timeUntilClaimExpires < 0) {
@ -171,8 +181,8 @@ export class TaskManagerRunner implements TaskRunner {
);
}
this.instance = await this.store.update({
...this.instance,
this.instance = await this.bufferedTaskStore.update({
...taskInstance,
status: 'running',
startedAt: now,
attempts,
@ -198,13 +208,14 @@ export class TaskManagerRunner implements TaskRunner {
);
}
performanceStopMarkingTaskAsRunning();
return true;
} catch (error) {
performanceStopMarkingTaskAsRunning();
if (error.statusCode !== VERSION_CONFLICT_STATUS) {
throw error;
}
}
return false;
}
@ -263,7 +274,7 @@ export class TaskManagerRunner implements TaskRunner {
runAt = intervalFromDate(startedAt, this.instance.interval)!;
}
await this.store.update({
await this.bufferedTaskStore.update({
...this.instance,
runAt,
state,
@ -280,7 +291,7 @@ export class TaskManagerRunner implements TaskRunner {
private async processResultWhenDone(result: RunResult): Promise<RunResult> {
// not a recurring task: clean up by removing the task instance from store
try {
await this.store.remove(this.instance.id);
await this.bufferedTaskStore.remove(this.instance.id);
} catch (err) {
if (err.statusCode === 404) {
this.logger.warn(`Task cleanup of ${this} failed in processing. Was remove called twice?`);
@ -306,7 +317,7 @@ export class TaskManagerRunner implements TaskRunner {
return 'idle';
}
const maxAttempts = this.definition.maxAttempts || this.store.maxAttempts;
const maxAttempts = this.definition.maxAttempts || this.bufferedTaskStore.maxAttempts;
return this.instance.attempts < maxAttempts ? 'idle' : 'failed';
}
@ -352,3 +363,12 @@ function sanitizeInstance(instance: ConcreteTaskInstance): ConcreteTaskInstance
function howManyMsUntilOwnershipClaimExpires(ownershipClaimedUntil: Date | null): number {
return ownershipClaimedUntil ? ownershipClaimedUntil.getTime() - Date.now() : 0;
}
function performanceStopMarkingTaskAsRunning() {
performance.mark('markTaskAsRunning_stop');
performance.measure(
'taskRunner.markTaskAsRunning',
'markTaskAsRunning_start',
'markTaskAsRunning_stop'
);
}

View file

@ -9,7 +9,6 @@ import sinon from 'sinon';
import uuid from 'uuid';
import { TaskDictionary, TaskDefinition, TaskInstance, TaskStatus } from './task';
import { FetchOpts, StoreOpts, OwnershipClaimingOpts, TaskStore } from './task_store';
import { mockLogger } from './test_utils';
import { savedObjectsClientMock } from 'src/core/server/mocks';
import { SavedObjectsSerializer, SavedObjectsSchema, SavedObjectAttributes } from 'src/core/server';
@ -77,6 +76,7 @@ describe('TaskStore', () => {
test('serializes the params and state', async () => {
const task = {
id: 'id',
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
@ -99,7 +99,10 @@ describe('TaskStore', () => {
taskType: 'report',
user: undefined,
},
{}
{
id: 'id',
refresh: false,
}
);
expect(result).toEqual({
@ -326,233 +329,6 @@ describe('TaskStore', () => {
});
});
describe('fetchAvailableTasks', () => {
async function testFetchAvailableTasks({ opts = {}, hits = [] }: any = {}) {
const callCluster = sinon.spy(async (name: string, params?: any) => ({ hits: { hits } }));
const store = new TaskStore({
callCluster,
logger: mockLogger(),
definitions: taskDefinitions,
maxAttempts: 2,
serializer,
...opts,
});
const result = await store.fetchAvailableTasks();
sinon.assert.calledOnce(callCluster);
sinon.assert.calledWith(callCluster, 'search');
return {
result,
args: callCluster.args[0][1],
};
}
test('it returns normally with no tasks when the index does not exist.', async () => {
const callCluster = sinon.spy(async (name: string, params?: any) => ({ hits: { hits: [] } }));
const store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
callCluster,
definitions: taskDefinitions,
maxAttempts: 2,
savedObjectsRepository: savedObjectsClient,
});
const result = await store.fetchAvailableTasks();
sinon.assert.calledOnce(callCluster);
sinon.assert.calledWithMatch(callCluster, 'search', { ignoreUnavailable: true });
expect(result.length).toBe(0);
});
test('it filters tasks by supported types, maxAttempts, and runAt', async () => {
const maxAttempts = _.random(2, 43);
const customMaxAttempts = _.random(44, 100);
const { args } = await testFetchAvailableTasks({
opts: {
maxAttempts,
definitions: {
foo: {
type: 'foo',
title: '',
createTaskRunner: jest.fn(),
},
bar: {
type: 'bar',
title: '',
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
},
},
});
expect(args).toMatchObject({
body: {
query: {
bool: {
must: [
{ term: { type: 'task' } },
{
bool: {
must: [
{
bool: {
should: [
{
bool: {
must: [
{ term: { 'task.status': 'idle' } },
{ range: { 'task.runAt': { lte: 'now' } } },
],
},
},
{
bool: {
must: [
{ term: { 'task.status': 'running' } },
{ range: { 'task.retryAt': { lte: 'now' } } },
],
},
},
],
},
},
{
bool: {
should: [
{ exists: { field: 'task.interval' } },
{
bool: {
must: [
{ term: { 'task.taskType': 'foo' } },
{
range: {
'task.attempts': {
lt: maxAttempts,
},
},
},
],
},
},
{
bool: {
must: [
{ term: { 'task.taskType': 'bar' } },
{
range: {
'task.attempts': {
lt: customMaxAttempts,
},
},
},
],
},
},
],
},
},
],
},
},
],
},
},
size: 10,
sort: {
_script: {
type: 'number',
order: 'asc',
script: {
lang: 'expression',
source: `doc['task.retryAt'].value || doc['task.runAt'].value`,
},
},
},
seq_no_primary_term: true,
},
});
});
test('it returns task objects', async () => {
const runAt = new Date();
const { result } = await testFetchAvailableTasks({
hits: [
{
_id: 'aaa',
_source: {
type: 'task',
task: {
runAt,
taskType: 'foo',
interval: undefined,
attempts: 0,
status: 'idle',
params: '{ "hello": "world" }',
state: '{ "baby": "Henhen" }',
user: 'jimbo',
scope: ['reporting'],
},
},
_seq_no: 1,
_primary_term: 2,
sort: ['a', 1],
},
{
_id: 'bbb',
_source: {
type: 'task',
task: {
runAt,
taskType: 'bar',
interval: '5m',
attempts: 2,
status: 'running',
params: '{ "shazm": 1 }',
state: '{ "henry": "The 8th" }',
user: 'dabo',
scope: ['reporting', 'ceo'],
},
},
_seq_no: 3,
_primary_term: 4,
sort: ['b', 2],
},
],
});
expect(result).toMatchObject([
{
attempts: 0,
id: 'aaa',
interval: undefined,
params: { hello: 'world' },
runAt,
scope: ['reporting'],
state: { baby: 'Henhen' },
status: 'idle',
taskType: 'foo',
user: 'jimbo',
},
{
attempts: 2,
id: 'bbb',
interval: '5m',
params: { shazm: 1 },
runAt,
scope: ['reporting', 'ceo'],
state: { henry: 'The 8th' },
status: 'running',
taskType: 'bar',
user: 'dabo',
},
]);
});
});
describe('claimAvailableTasks', () => {
async function testClaimAvailableTasks({
opts = {},
@ -927,7 +703,7 @@ describe('TaskStore', () => {
user: undefined,
ownerId: null,
},
{ version: '123' }
{ version: '123', refresh: false }
);
expect(result).toEqual({

View file

@ -70,6 +70,11 @@ export interface ClaimOwnershipResult {
docs: ConcreteTaskInstance[];
}
export interface BulkUpdateTaskFailureResult {
error: NonNullable<SavedObject['error']>;
task: ConcreteTaskInstance;
}
export interface UpdateByQueryResult {
updated: number;
version_conflicts: number;
@ -107,8 +112,6 @@ export class TaskStore {
this.definitions = opts.definitions;
this.serializer = opts.serializer;
this.savedObjectsRepository = opts.savedObjectsRepository;
this.fetchAvailableTasks = this.fetchAvailableTasks.bind(this);
}
/**
@ -128,7 +131,7 @@ export class TaskStore {
const savedObject = await this.savedObjectsRepository.create(
'task',
taskInstanceToAttributes(taskInstance),
{ id: taskInstance.id }
{ id: taskInstance.id, refresh: false }
);
return savedObjectToConcreteTaskInstance(savedObject);
@ -148,88 +151,6 @@ export class TaskStore {
});
}
/**
* Fetches tasks from the index, which are ready to be run.
* - runAt is now or past
* - id is not currently running in this instance of Kibana
* - has a type that is in our task definitions
*
* @param {TaskQuery} query
* @prop {string[]} types - Task types to be queried
* @prop {number} size - The number of task instances to retrieve
* @returns {Promise<ConcreteTaskInstance[]>}
*/
public async fetchAvailableTasks(): Promise<ConcreteTaskInstance[]> {
const { docs } = await this.search({
query: {
bool: {
must: [
// Either a task with idle status and runAt <= now or
// status running with a retryAt <= now.
{
bool: {
should: [
{
bool: {
must: [
{ term: { 'task.status': 'idle' } },
{ range: { 'task.runAt': { lte: 'now' } } },
],
},
},
{
bool: {
must: [
{ term: { 'task.status': 'running' } },
{ range: { 'task.retryAt': { lte: 'now' } } },
],
},
},
],
},
},
// Either task has an interval or the attempts < the maximum configured
{
bool: {
should: [
{ exists: { field: 'task.interval' } },
...Object.entries(this.definitions).map(([type, definition]) => ({
bool: {
must: [
{ term: { 'task.taskType': type } },
{
range: {
'task.attempts': {
lt: definition.maxAttempts || this.maxAttempts,
},
},
},
],
},
})),
],
},
},
],
},
},
size: 10,
sort: {
_script: {
type: 'number',
order: 'asc',
script: {
lang: 'expression',
source: `doc['task.retryAt'].value || doc['task.runAt'].value`,
},
},
},
seq_no_primary_term: true,
});
return docs;
}
/**
* Claims available tasks from the index, which are ready to be run.
* - runAt is now or past
@ -376,6 +297,7 @@ export class TaskStore {
return docs;
}
/**
* Updates the specified doc in the index, returning the doc
* with its version up to date.
@ -388,7 +310,10 @@ export class TaskStore {
'task',
doc.id,
taskInstanceToAttributes(doc),
{ version: doc.version }
{
refresh: false,
version: doc.version,
}
);
return savedObjectToConcreteTaskInstance(updatedSavedObject);

View file

@ -8,8 +8,6 @@
* A handful of helper functions for testing the task manager.
*/
import sinon from 'sinon';
// Caching this here to avoid setTimeout mocking affecting our tests.
const nativeTimeout = setTimeout;
@ -18,10 +16,10 @@ const nativeTimeout = setTimeout;
*/
export function mockLogger() {
return {
info: sinon.stub(),
debug: sinon.stub(),
warn: sinon.stub(),
error: sinon.stub(),
info: jest.fn(),
debug: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
};
}

View file

@ -4,6 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/
process.env.ALLOW_PERFORMANCE_HOOKS_IN_TASK_MANAGER = true;
require('@kbn/plugin-helpers').babelRegister();
require('@kbn/test').startServersCli(
require.resolve('../test/functional/config.js'),

View file

@ -4,16 +4,13 @@
* you may not use this file except in compliance with the Elastic License.
*/
import uuid from 'uuid';
import _ from 'lodash';
import stats from 'stats-lite';
import prettyMilliseconds from 'pretty-ms';
import { performance, PerformanceObserver } from 'perf_hooks';
import { initRoutes } from './init_routes';
function avg(items) {
return (
items.reduce((sum, val) => {
return sum + val;
}, 0) / items.length
);
}
export default function TaskManagerPerformanceAPI(kibana) {
return new kibana.Plugin({
name: 'perfTask',
@ -27,35 +24,46 @@ export default function TaskManagerPerformanceAPI(kibana) {
init(server) {
const taskManager = server.plugins.task_manager;
const performanceState = {
runningAverageTasks: 0,
averagesTaken: [],
runningAverageLeadTime: -1,
averagesTakenLeadTime: [],
leadTimeQueue: [],
};
const performanceState = resetPerfState({});
let lastFlush = new Date();
function flushPerfStats() {
setTimeout(flushPerfStats, 5000);
const prevFlush = lastFlush;
lastFlush = new Date();
setInterval(() => {
const tasks = performanceState.leadTimeQueue.length;
console.log(`I have processed ${tasks} tasks in the past 5s`);
const title = `[Perf${performanceState.capturing ? ' (capturing)' : ''}]`;
const seconds = parseInt((lastFlush - prevFlush) / 1000);
console.log(
`${title} I have processed ${tasks} tasks in the past ${seconds}s (${tasks /
seconds} per second)`
);
if (tasks > 0) {
const latestAverage = avg(performanceState.leadTimeQueue.splice(0, tasks));
const latestAverage = avg(performanceState.leadTimeQueue.splice(0, tasks)).mean;
performanceState.averagesTakenLeadTime.push(latestAverage);
performanceState.averagesTaken.push(tasks);
if (performanceState.averagesTakenLeadTime.length > 1) {
performanceState.runningAverageLeadTime = avg(performanceState.averagesTakenLeadTime);
performanceState.runningAverageTasks = avg(performanceState.averagesTaken);
performanceState.runningAverageLeadTime = avg(
performanceState.averagesTakenLeadTime
).mean;
performanceState.runningAverageTasksPerSecond =
avg(performanceState.averagesTaken).mean / 5;
} else {
performanceState.runningAverageLeadTime = latestAverage;
performanceState.runningAverageTasks = tasks;
performanceState.runningAverageTasksPerSecond = tasks / 5;
}
}
}, 5000);
}
setTimeout(flushPerfStats, 5000);
const title = 'Perf Test Task';
taskManager.registerTaskDefinitions({
performanceTestTask: {
title: 'Perf Test Task',
title,
description: 'A task for stress testing task_manager.',
timeout: '1m',
@ -64,27 +72,41 @@ export default function TaskManagerPerformanceAPI(kibana) {
async run() {
const { params, state } = taskInstance;
const runAt = millisecondsFromNow(5000);
const counter = state.counter ? state.counter : 1;
const now = Date.now();
const leadTime = now - taskInstance.runAt;
performanceState.leadTimeQueue.push(leadTime);
const counter = (state.counter ? 1 + state.counter : 1);
// schedule to run next cycle as soon as possible
const runAt = calRunAt(params, counter);
const stateUpdated = {
...state,
counter
counter: counter + 1,
};
if(params.trackExecutionTimeline) {
stateUpdated.timeline = stateUpdated.timeline || [];
stateUpdated.timeline.push({
owner: taskInstance.owner.split('-')[0],
counter,
leadTime,
ranAt: now
});
if (params.trackExecutionTimeline && state.perf && state.perf.id) {
performance.mark(`perfTask_run_${state.perf.id}_${counter}`);
performance.measure(
'perfTask.markUntilRun',
`perfTask_markAsRunning_${state.perf.id}_${counter}`,
`perfTask_run_${state.perf.id}_${counter}`
);
if (counter === 1) {
performance.measure(
'perfTask.firstRun',
`perfTask_schedule_${state.perf.id}`,
`perfTask_run_${state.perf.id}_${counter}`
);
performance.measure(
'perfTask.firstMarkAsRunningTillRan',
`perfTask_markAsRunning_${state.perf.id}_${counter}`,
`perfTask_run_${state.perf.id}_${counter}`
);
}
}
return {
state: stateUpdated,
runAt,
@ -95,17 +117,249 @@ export default function TaskManagerPerformanceAPI(kibana) {
},
});
initRoutes(server, performanceState);
taskManager.addMiddleware({
async beforeSave({ taskInstance, ...opts }) {
const modifiedInstance = {
...taskInstance,
};
if (taskInstance.params && taskInstance.params.trackExecutionTimeline) {
modifiedInstance.state = modifiedInstance.state || {};
modifiedInstance.state.perf = modifiedInstance.state.perf || {};
modifiedInstance.state.perf.id = uuid.v4().replace(/-/gi, '_');
performance.mark(`perfTask_schedule_${modifiedInstance.state.perf.id}`);
}
return {
...opts,
taskInstance: modifiedInstance,
};
},
async beforeMarkRunning({ taskInstance, ...opts }) {
const modifiedInstance = {
...taskInstance,
};
if (
modifiedInstance.state &&
modifiedInstance.state.perf &&
modifiedInstance.state.perf.id
) {
const { counter = 1 } = modifiedInstance.state;
performance.mark(`perfTask_markAsRunning_${modifiedInstance.state.perf.id}_${counter}`);
if (counter === 1) {
performance.measure(
'perfTask.firstMarkAsRunning',
`perfTask_schedule_${modifiedInstance.state.perf.id}`,
`perfTask_markAsRunning_${modifiedInstance.state.perf.id}_${counter}`
);
} else if (counter > 1) {
performance.measure(
'perfTask.runUntilNextMarkAsRunning',
`perfTask_run_${modifiedInstance.state.perf.id}_${counter - 1}`,
`perfTask_markAsRunning_${modifiedInstance.state.perf.id}_${counter}`
);
}
}
return {
...opts,
taskInstance: modifiedInstance,
};
},
});
const perfApi = {
capture() {
resetPerfState(performanceState);
performanceState.capturing = true;
performance.mark('perfTest.start');
},
endCapture() {
return new Promise(resolve => {
performanceState.performance.summarize.push([resolve, perfApi.summarize]);
performance.mark('perfTest.end');
performance.measure('perfTest.duration', 'perfTest.start', 'perfTest.end');
});
},
summarize(perfTestDuration) {
const {
runningAverageTasksPerSecond,
runningAverageLeadTime,
performance,
} = performanceState;
const {
numberOfTasksRanOverall,
elasticsearchApiCalls,
activityDuration,
sleepDuration,
cycles,
claimAvailableTasksNoTasks,
claimAvailableTasksNoAvailableWorkers,
taskPoolAttemptToRun,
taskRunnerMarkTaskAsRunning
} = performance;
const perfRes = {
perfTestDuration: prettyMilliseconds(perfTestDuration),
runningAverageTasksPerSecond,
runningAverageLeadTime,
numberOfTasksRanOverall,
claimAvailableTasksNoTasks,
claimAvailableTasksNoAvailableWorkers,
elasticsearchApiCalls: _.mapValues(elasticsearchApiCalls, avg),
sleepDuration: prettyMilliseconds(stats.sum(sleepDuration)),
activityDuration: prettyMilliseconds(stats.sum(activityDuration)),
cycles,
taskPoolAttemptToRun: avg(taskPoolAttemptToRun),
taskRunnerMarkTaskAsRunning: avg(taskRunnerMarkTaskAsRunning),
};
resetPerfState(performanceState);
return perfRes;
},
};
initRoutes(server, perfApi);
},
});
}
function millisecondsFromNow(ms) {
if (!ms) {
return;
function calRunAt(params, counter) {
const runAt = counter === 1 ? new Date(params.startAt) : new Date();
return runAt.getTime() < params.runUntil ? runAt : undefined;
}
function avg(items) {
const mode = stats.mode(items);
return {
mean: parseInt(stats.mean(items)),
range: {
min: parseInt(typeof mode === 'number' ? mode : _.min([...mode])),
max: parseInt(typeof mode === 'number' ? mode : _.max([...mode])),
},
};
}
function resetPerfState(target) {
if(target.performanceObserver) {
target.performanceObserver.disconnect();
}
const dt = new Date();
dt.setTime(dt.getTime() + ms);
return dt;
const performanceState = Object.assign(target, {
capturing: false,
runningAverageTasksPerSecond: 0,
averagesTaken: [],
runningAverageLeadTime: -1,
averagesTakenLeadTime: [],
leadTimeQueue: [],
performance: {
numberOfTasksRanOverall: 0,
cycles: {
fillPoolStarts: 0,
fillPoolCycles: 0,
fillPoolBail: 0,
fillPoolBailNoTasks: 0,
},
claimAvailableTasksNoTasks: 0,
claimAvailableTasksNoAvailableWorkers: 0,
elasticsearchApiCalls: {
timeUntilFirstRun: [],
timeUntilFirstMarkAsRun: [],
firstMarkAsRunningTillRan: [],
timeFromMarkAsRunTillRun: [],
timeFromRunTillNextMarkAsRun: [],
claimAvailableTasks: [],
},
activityDuration: [],
sleepDuration: [],
taskPollerActivityDurationPreScheduleComplete: [],
taskPoolAttemptToRun: [],
taskRunnerMarkTaskAsRunning: [],
summarize: []
},
});
performanceState.performanceObserver = new PerformanceObserver((list, observer) => {
list.getEntries().forEach(entry => {
const { name, duration } = entry;
switch (name) {
// Elasticsearch Api Calls
case 'perfTask.firstRun':
performanceState.performance.elasticsearchApiCalls.timeUntilFirstRun.push(duration);
break;
case 'perfTask.firstMarkAsRunning':
performanceState.performance.elasticsearchApiCalls.timeUntilFirstMarkAsRun.push(duration);
break;
case 'perfTask.firstMarkAsRunningTillRan':
performanceState.performance.elasticsearchApiCalls.firstMarkAsRunningTillRan.push(duration);
break;
case 'perfTask.markUntilRun':
performanceState.performance.elasticsearchApiCalls.timeFromMarkAsRunTillRun.push(duration);
break;
case 'perfTask.runUntilNextMarkAsRunning':
performanceState.performance.elasticsearchApiCalls.timeFromRunTillNextMarkAsRun.push(duration);
break;
case 'claimAvailableTasks':
performanceState.performance.elasticsearchApiCalls.claimAvailableTasks.push(duration);
break;
case 'TaskPoller.sleepDuration':
performanceState.performance.sleepDuration.push(duration);
break;
case 'fillPool.activityDurationUntilNoTasks':
performanceState.performance.activityDuration.push(duration);
break;
case 'fillPool.activityDurationUntilExhaustedCapacity':
performanceState.performance.activityDuration.push(duration);
break;
case 'fillPool.bailExhaustedCapacity':
performanceState.performance.cycles.fillPoolBail++;
break;
case 'fillPool.bailNoTasks':
performanceState.performance.cycles.fillPoolBail++;
performanceState.performance.cycles.fillPoolBailNoTasks++;
break;
case 'fillPool.start':
performanceState.performance.cycles.fillPoolStarts++;
break;
case 'fillPool.cycle':
performanceState.performance.cycles.fillPoolCycles++;
break;
break;
case 'claimAvailableTasks.noTasks':
performanceState.performance.claimAvailableTasksNoTasks++;
break;
case 'claimAvailableTasks.noAvailableWorkers':
performanceState.performance.claimAvailableTasksNoAvailableWorkers++;
break;
case 'taskPool.attemptToRun':
performanceState.performance.taskPoolAttemptToRun.push(duration);
break;
case 'taskRunner.markTaskAsRunning':
performanceState.performance.taskRunnerMarkTaskAsRunning.push(duration);
break;
case 'perfTest.duration':
observer.disconnect();
const { summarize } = performanceState.performance;
if(summarize && summarize.length) {
summarize.splice(0, summarize.length).forEach(([resolve, summarize]) => {
resolve(summarize(duration));
});
}
break;
default:
if (name.startsWith('perfTask_run_')) {
performanceState.performance.numberOfTasksRanOverall++;
}
}
});
});
performanceState.performanceObserver.observe({ entryTypes: ['measure', 'mark'] });
return performanceState;
}

View file

@ -5,6 +5,7 @@
*/
import Joi from 'joi';
import { range, chunk } from 'lodash';
const scope = 'perf-testing';
export function initRoutes(server, performanceState) {
@ -18,32 +19,56 @@ export function initRoutes(server, performanceState) {
payload: Joi.object({
tasksToSpawn: Joi.number().required(),
durationInSeconds: Joi.number().required(),
trackExecutionTimeline: Joi.boolean().default(false).required(),
trackExecutionTimeline: Joi.boolean()
.default(false)
.required(),
}),
},
},
async handler(request) {
const { tasksToSpawn, durationInSeconds, trackExecutionTimeline } = request.payload;
const tasks = [];
performanceState.capture();
for (let taskIndex = 0; taskIndex < tasksToSpawn; taskIndex++) {
tasks.push(
await taskManager.schedule(
{
taskType: 'performanceTestTask',
params: { taskIndex, trackExecutionTimeline },
scope: [scope],
},
{ request }
const { tasksToSpawn, durationInSeconds, trackExecutionTimeline } = request.payload;
const startAt = millisecondsFromNow(5000).getTime();
await chunk(range(tasksToSpawn), 200)
.map(chunkOfTasksToSpawn => () =>
Promise.all(
chunkOfTasksToSpawn.map(taskIndex =>
taskManager.schedule(
{
taskType: 'performanceTestTask',
params: {
startAt,
taskIndex,
trackExecutionTimeline,
runUntil: millisecondsFromNow(durationInSeconds * 1000).getTime(),
},
scope: [scope],
},
{ request }
)
)
)
);
}
)
.reduce((chain, nextExecutor) => {
return chain.then(() => nextExecutor());
}, Promise.resolve());
return new Promise(resolve => {
setTimeout(() => {
resolve(performanceState);
}, durationInSeconds * 1000);
performanceState.endCapture().then(resolve);
}, durationInSeconds * 1000 + 10000 /* wait extra 10s to drain queue */);
});
},
});
}
function millisecondsFromNow(ms) {
if (!ms) {
return;
}
const dt = new Date();
dt.setTime(dt.getTime() + ms);
return dt;
}

View file

@ -7,6 +7,10 @@
},
"license": "Apache-2.0",
"dependencies": {
"joi": "^13.5.2"
"lodash": "^4.17.15",
"uuid": "3.3.2",
"joi": "^13.5.2",
"stats-lite": "2.2.0",
"pretty-ms": "5.0.0"
}
}

View file

@ -10,23 +10,151 @@ export default function({ getService }: { getService: (service: string) => any }
const log = getService('log');
const supertest = getService('supertest');
const params = { tasksToSpawn: 100, trackExecutionTimeline: true, durationInSeconds: 60 };
describe('stressing task manager', () => {
it('should run 10 tasks every second for a minute', async () => {
const { runningAverageTasks, runningAverageLeadTime } = await supertest
it(`should run ${params.tasksToSpawn} tasks over ${params.durationInSeconds} seconds`, async () => {
const {
runningAverageTasksPerSecond,
runningAverageLeadTime,
// how often things happen in Task Manager
cycles: { fillPoolStarts, fillPoolCycles, fillPoolBail, fillPoolBailNoTasks },
claimAvailableTasksNoTasks,
claimAvailableTasksNoAvailableWorkers,
numberOfTasksRanOverall,
// how long it takes to talk to Elasticsearch
elasticsearchApiCalls: {
timeUntilFirstMarkAsRun,
firstMarkAsRunningTillRan,
timeFromMarkAsRunTillRun,
timeFromRunTillNextMarkAsRun,
claimAvailableTasks,
},
// durations in Task Manager
perfTestDuration,
taskPoolAttemptToRun,
taskRunnerMarkTaskAsRunning,
sleepDuration,
activityDuration,
} = await supertest
.post('/api/perf_tasks')
.set('kbn-xsrf', 'xxx')
.send({ tasksToSpawn: 20, trackExecutionTimeline: true, durationInSeconds: 60 })
.send(params)
.expect(200)
.then((response: any) => response.body);
log.debug(`Stress Test Result:`);
log.debug(`Average number of tasks executed per second: ${runningAverageTasks}`);
log.debug(
`Average time it took from the moment a task's scheduled time was reached, until Task Manager picked it up: ${runningAverageLeadTime}`
log.info(cyan(`Stress Test Result:`));
log.info(
`Average number of tasks executed per second: ${bright(runningAverageTasksPerSecond)}`
);
log.info(
`Average time between a task's "runAt" scheduled time and the time it actually ran: ${bright(
runningAverageLeadTime
)}`
);
expect(runningAverageTasks).to.be.greaterThan(0);
if (params.trackExecutionTimeline) {
log.info(
`Overall number of tasks ran in ${bright(params.durationInSeconds)} seconds: ${bright(
numberOfTasksRanOverall
)}`
);
log.info(`Average time between stages:`);
log.info(
`Schedule ---[${descMetric(
timeUntilFirstMarkAsRun
)}]--> first markAsRunning ---[${descMetric(firstMarkAsRunningTillRan)}]--> first run`
);
log.info(
`markAsRunning ---[${descMetric(timeFromMarkAsRunTillRun)}]--> run ---[${descMetric(
timeFromRunTillNextMarkAsRun
)}]---> next markAsRunning`
);
log.info(`Duration of Perf Test: ${bright(perfTestDuration)}`);
log.info(`Activity within Task Poller: ${bright(activityDuration)}`);
log.info(`Inactivity due to Sleep: ${bright(sleepDuration)}`);
log.info(`Polling Cycles: ${colorizeCycles(fillPoolStarts, fillPoolCycles, fillPoolBail)}`);
if (fillPoolBail > 0) {
log.info(` ⮑ Bailed due to:`);
if (fillPoolBailNoTasks > 0) {
log.info(` ⮑ No Tasks To Process:`);
if (claimAvailableTasksNoTasks > 0) {
log.info(`${claimAvailableTasksNoTasks} Times, due to No Tasks Claimed`);
}
if (claimAvailableTasksNoAvailableWorkers > 0) {
log.info(
`${claimAvailableTasksNoAvailableWorkers} Times, due to having No Available Worker Capacity`
);
}
}
if (fillPoolBail - fillPoolBailNoTasks > 0) {
log.info(
` ⮑ Exhausted Available Workers due to on going Task runs ${fillPoolBail -
fillPoolBailNoTasks}`
);
}
}
log.info(
`average duration taken to Claim Available Tasks: ${descMetric(claimAvailableTasks)}`
);
log.info(
`average duration taken to Mark claimed Tasks as Running in Task Pool: ${descMetric(
taskPoolAttemptToRun
)}`
);
log.info(
`average duration taken to Mark individual Tasks as Running in Task Runner: ${descMetric(
taskRunnerMarkTaskAsRunning
)}`
);
}
expect(runningAverageTasksPerSecond).to.be.greaterThan(0);
expect(runningAverageLeadTime).to.be.greaterThan(0);
});
});
}
function descMetric(metric: { mean: number; range: { min: number; max: number } }): string {
return `${colorize(metric.mean)}ms ${dim(`(`)}${colorize(metric.range.min)}${dim(
`ms - `
)}${colorize(metric.range.max)}${dim(`ms)`)}`;
}
function colorize(avg: number) {
if (!avg) {
return red('?');
}
return avg < 500 ? green(`${avg}`) : avg < 1000 ? cyan(`${avg}`) : red(`${avg}`);
}
function colorizeCycles(fillPoolStarts: number, fillPoolCycles: number, fillPoolBail: number) {
const perc = (fillPoolCycles * 100) / fillPoolStarts;
const colorFunc = perc >= 100 ? green : perc >= 50 ? cyan : red;
return (
`ran ` +
bright(`${fillPoolStarts}`) +
` cycles, of which ` +
colorFunc(`${fillPoolCycles}`) +
` were reran before bailing`
);
}
function cyan(str: string) {
return `\x1b[36m${str}\x1b[0m`;
}
function red(str: string) {
return `\x1b[31m${str}\x1b[0m`;
}
function green(str: string) {
return `\x1b[32m${str}\x1b[0m`;
}
function dim(str: string) {
return `\x1b[2m${str}\x1b[0m`;
}
function bright(str: string | number) {
return `\x1b[1m${str}\x1b[0m`;
}

View file

@ -5952,6 +5952,14 @@ babel-plugin-emotion@^9.2.11:
source-map "^0.5.7"
touch "^2.0.1"
babel-plugin-filter-imports@^3.0.0:
version "3.0.0"
resolved "https://registry.yarnpkg.com/babel-plugin-filter-imports/-/babel-plugin-filter-imports-3.0.0.tgz#a849683837ad29960da17492fb32789ab6b09a11"
integrity sha512-p/chjzVTgCxUqyLM0q/pfWVZS7IJTwGQMwNg0LOvuQpKiTftQgZDtkGB8XvETnUw19rRcL7bJCTopSwibTN2tA==
dependencies:
"@babel/types" "^7.4.0"
lodash "^4.17.11"
babel-plugin-istanbul@^5.1.0:
version "5.2.0"
resolved "https://registry.yarnpkg.com/babel-plugin-istanbul/-/babel-plugin-istanbul-5.2.0.tgz#df4ade83d897a92df069c4d9a25cf2671293c854"