[Task Manager] Adds runNow api to Task Manager (#51601)

Adds a `runNow` api to Task Manager, allowing us to force the refresh of a recurring task.

This PR includes a couple of sustainability changes as well as the feature itself.

1. **Declarative query composition.** At the moment the queries in the TaskStore are huge JSON objects that are hard to maintain and understand. This PR introduces a pattern where the different parts of the query are composed out of type-checked functions, making it easier to maintain and to construct dynamically as needs change. _This was included in this PR as the **markAvailableTasksAsClaimed** query needs different query clauses depending on whether there are specific Tasks we wish to claim first.

2. **Refactoring of the Task Poller** As the `runNow` api is introduced we find Task Manager's lifecycle in a weird state where it has both a _pull_ model, where timeouts & callbacks interact without having to responsd to any external requests, and a _push_ model where requests are made to the new `runNow` api. Balancing these two proved error prone, hard to maintain and had the potential of _lossy_ behaviour where requests are dropped accidentally. To address this TaskPoller has been refactored using Rxjs observables, remodelling the existing _pull_ mechanism as a _push_ mechanism so Task Manager can _respond_ to both _polling_ calls and _runNow_ in a similar fashion.

And ofcourse the main feature of this PR:

3. **runNow api** An api on TaskManager that takes a _task ID_ and attempts to run the task. The call returns a promise which resolves with a result which notifies the caller when the task has either completed successfully, or result in an error.
This commit is contained in:
Gidi Meir Morris 2019-12-16 14:12:25 +00:00 committed by GitHub
parent a9831d6d8c
commit bb98e9a2b8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
36 changed files with 3011 additions and 557 deletions

View file

@ -34,7 +34,7 @@ beforeAll(() => {
state: {},
attempts: 0,
ownerId: '',
status: 'running' as TaskStatus,
status: TaskStatus.Running,
startedAt: new Date(),
scheduledAt: new Date(),
retryAt: new Date(Date.now() + 5 * 60 * 1000),

View file

@ -9,6 +9,7 @@ import { AlertsClient } from './alerts_client';
import { savedObjectsClientMock, loggingServiceMock } from '../../../../../src/core/server/mocks';
import { taskManagerMock } from '../../task_manager/task_manager.mock';
import { alertTypeRegistryMock } from './alert_type_registry.mock';
import { TaskStatus } from '../../task_manager';
const taskManager = taskManagerMock.create();
const alertTypeRegistry = alertTypeRegistryMock.create();
@ -119,7 +120,7 @@ describe('create()', () => {
taskType: 'alerting:123',
scheduledAt: new Date(),
attempts: 1,
status: 'idle',
status: TaskStatus.Idle,
runAt: new Date(),
startedAt: null,
retryAt: null,
@ -351,7 +352,7 @@ describe('create()', () => {
taskType: 'alerting:123',
scheduledAt: new Date(),
attempts: 1,
status: 'idle',
status: TaskStatus.Idle,
runAt: new Date(),
startedAt: null,
retryAt: null,
@ -749,7 +750,7 @@ describe('create()', () => {
taskType: 'alerting:123',
scheduledAt: new Date(),
attempts: 1,
status: 'idle',
status: TaskStatus.Idle,
runAt: new Date(),
startedAt: null,
retryAt: null,
@ -830,7 +831,7 @@ describe('enable()', () => {
id: 'task-123',
scheduledAt: new Date(),
attempts: 0,
status: 'idle',
status: TaskStatus.Idle,
runAt: new Date(),
state: {},
params: {},
@ -907,7 +908,7 @@ describe('enable()', () => {
id: 'task-123',
scheduledAt: new Date(),
attempts: 0,
status: 'idle',
status: TaskStatus.Idle,
runAt: new Date(),
state: {},
params: {},

View file

@ -7,7 +7,7 @@
import sinon from 'sinon';
import { schema } from '@kbn/config-schema';
import { AlertExecutorOptions } from '../types';
import { ConcreteTaskInstance } from '../../../task_manager';
import { ConcreteTaskInstance, TaskStatus } from '../../../task_manager';
import { TaskRunnerContext, TaskRunnerFactory } from './task_runner_factory';
import { encryptedSavedObjectsMock } from '../../../../../plugins/encrypted_saved_objects/server/mocks';
import {
@ -30,7 +30,7 @@ beforeAll(() => {
mockedTaskInstance = {
id: '',
attempts: 0,
status: 'running',
status: TaskStatus.Running,
version: '123',
runAt: new Date(),
scheduledAt: new Date(),

View file

@ -265,6 +265,23 @@ The danger is that in such a situation, a Task with that same `id` might already
To achieve this you should use the `ensureScheduling` api which has the exact same behavior as `schedule`, except it allows the scheduling of a Task with an `id` that's already in assigned to another Task and it will assume that the existing Task is the one you wished to `schedule`, treating this as a successful operation.
### runNow
Using `runNow` you can instruct TaskManger to run an existing task on-demand, without waiting for its scheduled time to be reached.
```js
const taskManager = server.plugins.task_manager;
try {
const taskRunResult = await taskManager.runNow('91760f10-ba42-de9799');
// If no error is thrown, the task has completed successfully.
} catch(err: Error) {
// If running the task has failed, we throw an error with an appropriate message.
// For example, if the requested task doesnt exist: `Error: failed to run task "91760f10-ba42-de9799" as it does not exist`
// Or if, for example, the task is already running: `Error: failed to run task "91760f10-ba42-de9799" as it is currently running`
}
```
### more options
More custom access to the tasks can be done directly via Elasticsearch, though that won't be officially supported, as we can change the document structure at any time.
@ -305,6 +322,37 @@ server.plugins.task_manager.addMiddleware({
});
```
## Task Poller: polling for work
TaskManager used to work in a `pull` model, but it now needs to support both `push` and `pull`, so it has been remodeled internally to support a single `push` model.
Task Manager's _push_ mechanism is driven by the following operations:
1. A polling interval has been reached.
2. A new Task is scheduled.
3. A Task is run using `runNow`.
The polling interval is straight forward: TaskPoller is configured to emit an event at a fixed interval.
That said, if there are no workers available, we want to ignore these events, so we'll throttle the interval on worker availability.
Whenever a user uses the `schedule` api to schedule a new Task, we want to trigger an early polling in order to respond to the newly scheduled task as soon as possible, but this too we only wish to do if there are available workers, so we can throttle this too.
When a `runNow` call is made we need to force a poll as the user will now be waiting on the result of the `runNow` call, but
there is a complexity here- we don't want to force polling (as there might not be any worker capacity and it's possible that a polling cycle is already running), but we also can't throttle, as we can't afford to "drop" these requests, so we'll have to buffer these.
We now want to respond to all three of these push events, but we still need to balance against our worker capacity, so if there are too many requests buffered, we only want to `take` as many requests as we have capacity to handle.
Luckily, `Polling Interval` and `Task Scheduled` simply denote a request to "poll for work as soon as possible", unlike `Run Task Now` which also means "poll for these specific tasks", so our worker capacity only needs to be applied to `Run Task Now`.
We achieve this model by buffering requests into a queue using a Set (which removes duplicated). As we don't want an unbounded queue in our system, we have limited the size of this queue (configurable by the `xpack.task_manager.request_capacity` config, defaulting to 1,000 requests) which forces us to throw an error once this cap is reachedand to all subsequent calls to `runNow` until the queue drain bellow the cap.
Our current model, then, is this:
```
Polling Interval --> filter(availableWorkers > 0) - mapTo([]) -------\\
Task Scheduled --> filter(availableWorkers > 0) - mapTo([]) --------||==>Set([]+[]+[`1`,`2`]) ==> work([`1`,`2`])
Run Task `1` Now --\ //
----> buffer(availableWorkers > 0) -- [`1`,`2`] -//
Run Task `2` Now --/
```
## Limitations in v1.0
In v1, the system only understands 1 minute increments (e.g. '1m', '7m'). Tasks which need something more robust will need to specify their own "runAt" in their run method's return value.

View file

@ -36,8 +36,13 @@ export function taskManager(kibana: any) {
.default(3),
poll_interval: Joi.number()
.description('How often, in milliseconds, the task manager will look for more work.')
.min(1000)
.min(100)
.default(3000),
request_capacity: Joi.number()
.description('How many requests can Task Manager buffer before it rejects new requests.')
.min(1)
// a nice round contrived number, feel free to change as we learn how it behaves
.default(1000),
index: Joi.string()
.description('The name of the index used to store task information.')
.default('.kibana_task_manager')

View file

@ -20,7 +20,7 @@ describe('fillPool', () => {
const run = sinon.spy(async () => TaskPoolRunResult.RunningAllClaimedTasks);
const converter = _.identity;
await fillPool(run, fetchAvailableTasks, converter);
await fillPool(fetchAvailableTasks, converter, run);
expect(_.flattenDeep(run.args)).toEqual([1, 2, 3, 4, 5]);
});
@ -35,7 +35,7 @@ describe('fillPool', () => {
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = _.identity;
await fillPool(run, fetchAvailableTasks, converter);
await fillPool(fetchAvailableTasks, converter, run);
expect(_.flattenDeep(run.args)).toEqual([1, 2, 3]);
});
@ -50,7 +50,7 @@ describe('fillPool', () => {
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = (x: number) => x.toString();
await fillPool(run, fetchAvailableTasks, converter);
await fillPool(fetchAvailableTasks, converter, run);
expect(_.flattenDeep(run.args)).toEqual(['1', '2', '3']);
});
@ -63,7 +63,7 @@ describe('fillPool', () => {
try {
const fetchAvailableTasks = async () => Promise.reject('fetch is not working');
await fillPool(run, fetchAvailableTasks, converter);
await fillPool(fetchAvailableTasks, converter, run);
} catch (err) {
expect(err.toString()).toBe('fetch is not working');
expect(run.called).toBe(false);
@ -82,7 +82,7 @@ describe('fillPool', () => {
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
await fillPool(run, fetchAvailableTasks, converter);
await fillPool(fetchAvailableTasks, converter, run);
} catch (err) {
expect(err.toString()).toBe('run is not working');
}
@ -101,7 +101,7 @@ describe('fillPool', () => {
throw new Error(`can not convert ${x}`);
};
await fillPool(run, fetchAvailableTasks, converter);
await fillPool(fetchAvailableTasks, converter, run);
} catch (err) {
expect(err.toString()).toBe('Error: can not convert 1');
}

View file

@ -5,6 +5,7 @@
*/
import { performance } from 'perf_hooks';
import { after } from 'lodash';
import { TaskPoolRunResult } from '../task_pool';
export enum FillPoolResult {
@ -29,11 +30,14 @@ type Converter<T1, T2> = (t: T1) => T2;
* @param converter - a function that converts task records to the appropriate task runner
*/
export async function fillPool<TRecord, TRunner>(
run: BatchRun<TRunner>,
fetchAvailableTasks: Fetcher<TRecord>,
converter: Converter<TRecord, TRunner>
converter: Converter<TRecord, TRunner>,
run: BatchRun<TRunner>
): Promise<FillPoolResult> {
performance.mark('fillPool.start');
const markClaimedTasksOnRerunCycle = after(2, () =>
performance.mark('fillPool.claimedOnRerunCycle')
);
while (true) {
const instances = await fetchAvailableTasks();
@ -46,7 +50,7 @@ export async function fillPool<TRecord, TRunner>(
);
return FillPoolResult.NoTasksClaimed;
}
markClaimedTasksOnRerunCycle();
const tasks = instances.map(converter);
if ((await run(tasks)) === TaskPoolRunResult.RanOutOfCapacity) {

View file

@ -37,7 +37,7 @@ const getMockConcreteTaskInstance = () => {
sequenceNumber: 1,
primaryTerm: 1,
attempts: 0,
status: 'idle',
status: TaskStatus.Idle,
runAt: new Date(moment('2018-09-18T05:33:09.588Z').valueOf()),
scheduledAt: new Date(moment('2018-09-18T05:33:09.588Z').valueOf()),
startedAt: null,

View file

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { pullFromSet } from './pull_from_set';
describe(`pullFromSet`, () => {
test(`doesnt pull from an empty set`, () => {
expect(pullFromSet(new Set(), 10)).toEqual([]);
});
test(`doesnt pull when there is no capacity`, () => {
expect(pullFromSet(new Set([1, 2, 3]), 0)).toEqual([]);
});
test(`pulls as many values as there are in the set`, () => {
expect(pullFromSet(new Set([1, 2, 3]), 3)).toEqual([1, 2, 3]);
});
test(`pulls as many values as there are in the set up to capacity`, () => {
expect(pullFromSet(new Set([1, 2, 3]), 2)).toEqual([1, 2]);
});
test(`modifies the orginal set`, () => {
const set = new Set([1, 2, 3]);
expect(pullFromSet(set, 2)).toEqual([1, 2]);
expect(set).toEqual(new Set([3]));
});
});

View file

@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export function pullFromSet<T>(set: Set<T>, capacity: number) {
if (capacity > 0 && set.size > 0) {
const values = [];
for (const value of set) {
if (set.delete(value)) {
values.push(value);
if (values.length === capacity) {
return values;
}
}
}
return values;
}
return [];
}

View file

@ -0,0 +1,99 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { curry } from 'lodash';
export interface Ok<T> {
tag: 'ok';
value: T;
}
export interface Err<E> {
tag: 'err';
error: E;
}
export type Result<T, E> = Ok<T> | Err<E>;
export function asOk<T>(value: T): Ok<T> {
return {
tag: 'ok',
value,
};
}
export function asErr<T>(error: T): Err<T> {
return {
tag: 'err',
error,
};
}
export function isOk<T, E>(result: Result<T, E>): result is Ok<T> {
return result.tag === 'ok';
}
export function isErr<T, E>(result: Result<T, E>): result is Err<E> {
return !isOk(result);
}
export async function promiseResult<T, E>(future: Promise<T>): Promise<Result<T, E>> {
try {
return asOk(await future);
} catch (e) {
return asErr(e);
}
}
export function unwrap<T, E>(result: Result<T, E>): T | E {
return isOk(result) ? result.value : result.error;
}
export function either<T, E>(
result: Result<T, E>,
onOk: (value: T) => void,
onErr: (error: E) => void
): Result<T, E> {
map<T, E, void>(result, onOk, onErr);
return result;
}
export async function eitherAsync<T, E>(
result: Result<T, E>,
onOk: (value: T) => Promise<void>,
onErr: (error: E) => Promise<void>
): Promise<Result<T, E> | void> {
return await map<T, E, Promise<void>>(result, onOk, onErr);
}
export function map<T, E, Resolution>(
result: Result<T, E>,
onOk: (value: T) => Resolution,
onErr: (error: E) => Resolution
): Resolution {
return isOk(result) ? onOk(result.value) : onErr(result.error);
}
export const mapR = curry(function<T, E, Resolution>(
onOk: (value: T) => Resolution,
onErr: (error: E) => Resolution,
result: Result<T, E>
): Resolution {
return map(result, onOk, onErr);
});
export const mapOk = curry(function<T, T2, E>(
onOk: (value: T) => Result<T2, E>,
result: Result<T, E>
): Result<T2, E> {
return isOk(result) ? onOk(result.value) : result;
});
export const mapErr = curry(function<T, E, E2>(
onErr: (error: E) => Result<T, E2>,
result: Result<T, E>
): Result<T, E2> {
return isOk(result) ? result : onErr(result.error);
});

View file

@ -46,6 +46,7 @@ describe('Task Manager Plugin', () => {
"fetch": [Function],
"registerTaskDefinitions": [Function],
"remove": [Function],
"runNow": [Function],
"schedule": [Function],
}
`);

View file

@ -11,6 +11,7 @@ export interface PluginSetupContract {
fetch: TaskManager['fetch'];
remove: TaskManager['remove'];
schedule: TaskManager['schedule'];
runNow: TaskManager['runNow'];
ensureScheduled: TaskManager['ensureScheduled'];
addMiddleware: TaskManager['addMiddleware'];
registerTaskDefinitions: TaskManager['registerTaskDefinitions'];
@ -60,6 +61,7 @@ export class Plugin {
fetch: (...args) => taskManager.fetch(...args),
remove: (...args) => taskManager.remove(...args),
schedule: (...args) => taskManager.schedule(...args),
runNow: (...args) => taskManager.runNow(...args),
ensureScheduled: (...args) => taskManager.ensureScheduled(...args),
addMiddleware: (...args) => taskManager.addMiddleware(...args),
registerTaskDefinitions: (...args) => taskManager.registerTaskDefinitions(...args),

View file

@ -0,0 +1,171 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import _ from 'lodash';
import {
asUpdateByQuery,
shouldBeOneOf,
mustBeAllOf,
ExistsBoolClause,
TermBoolClause,
RangeBoolClause,
} from './query_clauses';
import {
updateFields,
IdleTaskWithExpiredRunAt,
RunningOrClaimingTaskWithExpiredRetryAt,
RecuringTaskWithInterval,
taskWithLessThanMaxAttempts,
SortByRunAtAndRetryAt,
} from './mark_available_tasks_as_claimed';
import { TaskDictionary, TaskDefinition } from '../task';
describe('mark_available_tasks_as_claimed', () => {
test('generates query matching tasks to be claimed when polling for tasks', () => {
const definitions: TaskDictionary<TaskDefinition> = {
sampleTask: {
type: 'sampleTask',
title: 'title',
maxAttempts: 5,
createTaskRunner: () => ({ run: () => Promise.resolve() }),
},
otherTask: {
type: 'otherTask',
title: 'title',
createTaskRunner: () => ({ run: () => Promise.resolve() }),
},
};
const defaultMaxAttempts = 1;
const taskManagerId = '3478fg6-82374f6-83467gf5-384g6f';
const claimOwnershipUntil = '2019-02-12T21:01:22.479Z';
expect(
asUpdateByQuery({
query: mustBeAllOf(
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt),
// Either task has an interval or the attempts < the maximum configured
shouldBeOneOf<ExistsBoolClause | TermBoolClause | RangeBoolClause>(
RecuringTaskWithInterval,
...Object.entries(definitions).map(([type, { maxAttempts }]) =>
taskWithLessThanMaxAttempts(type, maxAttempts || defaultMaxAttempts)
)
)
),
update: updateFields({
ownerId: taskManagerId,
status: 'claiming',
retryAt: claimOwnershipUntil,
}),
sort: SortByRunAtAndRetryAt,
})
).toEqual({
query: {
bool: {
must: [
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
{
bool: {
should: [
{
bool: {
must: [
{ term: { 'task.status': 'idle' } },
{ range: { 'task.runAt': { lte: 'now' } } },
],
},
},
{
bool: {
must: [
{
bool: {
should: [
{ term: { 'task.status': 'running' } },
{ term: { 'task.status': 'claiming' } },
],
},
},
{ range: { 'task.retryAt': { lte: 'now' } } },
],
},
},
],
},
},
// Either task has an interval or the attempts < the maximum configured
{
bool: {
should: [
{ exists: { field: 'task.interval' } },
{
bool: {
must: [
{ term: { 'task.taskType': 'sampleTask' } },
{
range: {
'task.attempts': {
lt: 5,
},
},
},
],
},
},
{
bool: {
must: [
{ term: { 'task.taskType': 'otherTask' } },
{
range: {
'task.attempts': {
lt: 1,
},
},
},
],
},
},
],
},
},
],
},
},
sort: {
_script: {
type: 'number',
order: 'asc',
script: {
lang: 'painless',
source: `
if (doc['task.retryAt'].size()!=0) {
return doc['task.retryAt'].value.toInstant().toEpochMilli();
}
if (doc['task.runAt'].size()!=0) {
return doc['task.runAt'].value.toInstant().toEpochMilli();
}
`,
},
},
},
seq_no_primary_term: true,
script: {
source: `ctx._source.task.ownerId=params.ownerId; ctx._source.task.status=params.status; ctx._source.task.retryAt=params.retryAt;`,
lang: 'painless',
params: {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
status: 'claiming',
},
},
});
});
});

View file

@ -0,0 +1,129 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { defaultsDeep } from 'lodash';
import {
BoolClause,
IDsClause,
SortClause,
ScriptClause,
ExistsBoolClause,
TermBoolClause,
RangeBoolClause,
} from './query_clauses';
export const RecuringTaskWithInterval: ExistsBoolClause = { exists: { field: 'task.interval' } };
export function taskWithLessThanMaxAttempts(
type: string,
maxAttempts: number
): BoolClause<TermBoolClause | RangeBoolClause> {
return {
bool: {
must: [
{ term: { 'task.taskType': type } },
{
range: {
'task.attempts': {
lt: maxAttempts,
},
},
},
],
},
};
}
export const IdleTaskWithExpiredRunAt: BoolClause<TermBoolClause | RangeBoolClause> = {
bool: {
must: [{ term: { 'task.status': 'idle' } }, { range: { 'task.runAt': { lte: 'now' } } }],
},
};
export const taskWithIDsAndRunnableStatus = (
claimTasksById: string[]
): BoolClause<TermBoolClause | IDsClause> => ({
bool: {
must: [
{
bool: {
should: [{ term: { 'task.status': 'idle' } }, { term: { 'task.status': 'failed' } }],
},
},
{
ids: {
values: claimTasksById,
},
},
],
},
});
export const RunningOrClaimingTaskWithExpiredRetryAt: BoolClause<
TermBoolClause | RangeBoolClause
> = {
bool: {
must: [
{
bool: {
should: [{ term: { 'task.status': 'running' } }, { term: { 'task.status': 'claiming' } }],
},
},
{ range: { 'task.retryAt': { lte: 'now' } } },
],
},
};
export const SortByRunAtAndRetryAt: SortClause = {
_script: {
type: 'number',
order: 'asc',
script: {
lang: 'painless',
source: `
if (doc['task.retryAt'].size()!=0) {
return doc['task.retryAt'].value.toInstant().toEpochMilli();
}
if (doc['task.runAt'].size()!=0) {
return doc['task.runAt'].value.toInstant().toEpochMilli();
}
`,
},
},
};
const SORT_VALUE_TO_BE_FIRST = 0;
export const sortByIdsThenByScheduling = (claimTasksById: string[]): SortClause => {
const {
_script: {
script: { source },
},
} = SortByRunAtAndRetryAt;
return defaultsDeep(
{
_script: {
script: {
source: `
if(params.ids.contains(doc['_id'].value)){
return ${SORT_VALUE_TO_BE_FIRST};
}
${source}
`,
params: { ids: claimTasksById },
},
},
},
SortByRunAtAndRetryAt
);
};
export const updateFields = (fieldUpdates: {
[field: string]: string | number | Date;
}): ScriptClause => ({
source: Object.keys(fieldUpdates)
.map(field => `ctx._source.task.${field}=params.${field};`)
.join(' '),
lang: 'painless',
params: fieldUpdates,
});

View file

@ -0,0 +1,95 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export interface TermBoolClause {
term: { [field: string]: string | string[] };
}
export interface RangeBoolClause {
range: { [field: string]: { lte: string | number } | { lt: string | number } };
}
export interface ExistsBoolClause {
exists: { field: string };
}
export interface IDsClause {
ids: {
values: string[];
};
}
export interface ShouldClause<T> {
should: Array<BoolClause<T> | IDsClause | T>;
}
export interface MustClause<T> {
must: Array<BoolClause<T> | IDsClause | T>;
}
export interface BoolClause<T> {
bool: MustClause<T> | ShouldClause<T>;
}
export interface SortClause {
_script: {
type: string;
order: string;
script: {
lang: string;
source: string;
params?: { [param: string]: string | string[] };
};
};
}
export interface ScriptClause {
source: string;
lang: string;
params: {
[field: string]: string | number | Date;
};
}
export interface UpdateByQuery<T> {
query: BoolClause<T>;
sort: SortClause;
seq_no_primary_term: true;
script: ScriptClause;
}
export function shouldBeOneOf<T>(
...should: Array<BoolClause<T> | IDsClause | T>
): {
bool: ShouldClause<T>;
} {
return {
bool: {
should,
},
};
}
export function mustBeAllOf<T>(
...must: Array<BoolClause<T> | IDsClause | T>
): {
bool: MustClause<T>;
} {
return {
bool: {
must,
},
};
}
export function asUpdateByQuery<T>({
query,
update,
sort,
}: {
query: BoolClause<T>;
update: ScriptClause;
sort: SortClause;
}): UpdateByQuery<T> {
return {
query,
sort,
seq_no_primary_term: true,
script: update,
};
}

View file

@ -64,6 +64,19 @@ export interface RunResult {
state: Record<string, any>;
}
export interface SuccessfulRunResult {
runAt?: Date;
state?: Record<string, any>;
}
export interface FailedRunResult extends SuccessfulRunResult {
error: Error;
}
export interface FailedTaskResult {
status: TaskStatus.Failed;
}
export const validateRunResult = Joi.object({
runAt: Joi.date().optional(),
error: Joi.object().optional(),
@ -150,7 +163,18 @@ export interface TaskDictionary<T extends TaskDefinition> {
[taskType: string]: T;
}
export type TaskStatus = 'idle' | 'claiming' | 'running' | 'failed';
export enum TaskStatus {
Idle = 'idle',
Claiming = 'claiming',
Running = 'running',
Failed = 'failed',
}
export enum TaskLifecycleResult {
NotFound = 'notFound',
}
export type TaskLifecycle = TaskStatus | TaskLifecycleResult;
/*
* A task instance represents all of the data required to store, fetch,

View file

@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { ConcreteTaskInstance } from './task';
import { Result, Err } from './lib/result_type';
export enum TaskEventType {
TASK_CLAIM = 'TASK_CLAIM',
TASK_MARK_RUNNING = 'TASK_MARK_RUNNING',
TASK_RUN = 'TASK_RUN',
TASK_RUN_REQUEST = 'TASK_RUN_REQUEST',
}
export interface TaskEvent<T, E> {
id: string;
type: TaskEventType;
event: Result<T, E>;
}
export type TaskMarkRunning = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskRun = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskClaim = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskRunRequest = TaskEvent<ConcreteTaskInstance, Error>;
export function asTaskMarkRunningEvent(
id: string,
event: Result<ConcreteTaskInstance, Error>
): TaskMarkRunning {
return {
id,
type: TaskEventType.TASK_MARK_RUNNING,
event,
};
}
export function asTaskRunEvent(id: string, event: Result<ConcreteTaskInstance, Error>): TaskRun {
return {
id,
type: TaskEventType.TASK_RUN,
event,
};
}
export function asTaskClaimEvent(
id: string,
event: Result<ConcreteTaskInstance, Error>
): TaskClaim {
return {
id,
type: TaskEventType.TASK_CLAIM,
event,
};
}
export function asTaskRunRequestEvent(
id: string,
// we only emit a TaskRunRequest event when it fails
event: Err<Error>
): TaskRunRequest {
return {
id,
type: TaskEventType.TASK_RUN_REQUEST,
event,
};
}
export function isTaskMarkRunningEvent(
taskEvent: TaskEvent<any, any>
): taskEvent is TaskMarkRunning {
return taskEvent.type === TaskEventType.TASK_MARK_RUNNING;
}
export function isTaskRunEvent(taskEvent: TaskEvent<any, any>): taskEvent is TaskRun {
return taskEvent.type === TaskEventType.TASK_RUN;
}
export function isTaskClaimEvent(taskEvent: TaskEvent<any, any>): taskEvent is TaskClaim {
return taskEvent.type === TaskEventType.TASK_CLAIM;
}
export function isTaskRunRequestEvent(taskEvent: TaskEvent<any, any>): taskEvent is TaskRunRequest {
return taskEvent.type === TaskEventType.TASK_RUN_REQUEST;
}

View file

@ -13,6 +13,7 @@ const createTaskManagerMock = () => {
ensureScheduled: jest.fn(),
schedule: jest.fn(),
fetch: jest.fn(),
runNow: jest.fn(),
remove: jest.fn(),
start: jest.fn(),
stop: jest.fn(),

View file

@ -6,7 +6,20 @@
import _ from 'lodash';
import sinon from 'sinon';
import { TaskManager, claimAvailableTasks } from './task_manager';
import { Subject } from 'rxjs';
import {
asTaskMarkRunningEvent,
asTaskRunEvent,
asTaskClaimEvent,
asTaskRunRequestEvent,
} from './task_events';
import {
TaskManager,
claimAvailableTasks,
awaitTaskRunResult,
TaskLifecycleEvent,
} from './task_manager';
// Task manager uses an unconventional directory structure so the linter marks this as a violation, server files should
// be moved under task_manager/server/
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
@ -14,6 +27,8 @@ import { savedObjectsClientMock } from 'src/core/server/mocks';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { SavedObjectsSerializer, SavedObjectsSchema } from 'src/core/server';
import { mockLogger } from './test_utils';
import { asErr, asOk } from './lib/result_type';
import { ConcreteTaskInstance, TaskLifecycleResult, TaskStatus } from './task';
const savedObjectsClient = savedObjectsClientMock.create();
const serializer = new SavedObjectsSerializer(new SavedObjectsSchema());
@ -277,6 +292,165 @@ describe('TaskManager', () => {
);
});
describe('runNow', () => {
describe('awaitTaskRunResult', () => {
test('resolves when the task run succeeds', () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const getLifecycle = jest.fn();
const result = awaitTaskRunResult(id, events$, getLifecycle);
const task = { id } as ConcreteTaskInstance;
events$.next(asTaskRunEvent(id, asOk(task)));
return expect(result).resolves.toEqual({ id });
});
test('rejects when the task run fails', () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const getLifecycle = jest.fn();
const result = awaitTaskRunResult(id, events$, getLifecycle);
const task = { id } as ConcreteTaskInstance;
events$.next(asTaskClaimEvent(id, asOk(task)));
events$.next(asTaskMarkRunningEvent(id, asOk(task)));
events$.next(asTaskRunEvent(id, asErr(new Error('some thing gone wrong'))));
return expect(result).rejects.toEqual(new Error('some thing gone wrong'));
});
test('rejects when the task mark as running fails', () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const getLifecycle = jest.fn();
const result = awaitTaskRunResult(id, events$, getLifecycle);
const task = { id } as ConcreteTaskInstance;
events$.next(asTaskClaimEvent(id, asOk(task)));
events$.next(asTaskMarkRunningEvent(id, asErr(new Error('some thing gone wrong'))));
return expect(result).rejects.toEqual(new Error('some thing gone wrong'));
});
test('when a task claim fails we ensure the task exists', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const getLifecycle = jest.fn(async () => TaskLifecycleResult.NotFound);
const result = awaitTaskRunResult(id, events$, getLifecycle);
events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it does not exist`)
);
expect(getLifecycle).toHaveBeenCalledWith(id);
});
test('when a task claim fails we ensure the task isnt already claimed', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const getLifecycle = jest.fn(async () => TaskStatus.Claiming);
const result = awaitTaskRunResult(id, events$, getLifecycle);
events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it is currently running`)
);
expect(getLifecycle).toHaveBeenCalledWith(id);
});
test('when a task claim fails we ensure the task isnt already running', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const getLifecycle = jest.fn(async () => TaskStatus.Running);
const result = awaitTaskRunResult(id, events$, getLifecycle);
events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it is currently running`)
);
expect(getLifecycle).toHaveBeenCalledWith(id);
});
test('rejects when the task run fails due to capacity', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const getLifecycle = jest.fn(async () => TaskStatus.Idle);
const result = awaitTaskRunResult(id, events$, getLifecycle);
events$.next(asTaskRunRequestEvent(id, asErr(new Error('failed to buffer request'))));
await expect(result).rejects.toEqual(
new Error(
`Failed to run task "${id}" as Task Manager is at capacity, please try again later`
)
);
expect(getLifecycle).not.toHaveBeenCalled();
});
test('when a task claim fails we return the underlying error if the task is idle', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const getLifecycle = jest.fn(async () => TaskStatus.Idle);
const result = awaitTaskRunResult(id, events$, getLifecycle);
events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
await expect(result).rejects.toEqual(new Error('failed to claim'));
expect(getLifecycle).toHaveBeenCalledWith(id);
});
test('when a task claim fails we return the underlying error if the task is failed', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const getLifecycle = jest.fn(async () => TaskStatus.Failed);
const result = awaitTaskRunResult(id, events$, getLifecycle);
events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
await expect(result).rejects.toEqual(new Error('failed to claim'));
expect(getLifecycle).toHaveBeenCalledWith(id);
});
test('ignores task run success of other tasks', () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const differentTask = '4bebf429-181b-4518-bb7d-b4246d8a35f0';
const getLifecycle = jest.fn();
const result = awaitTaskRunResult(id, events$, getLifecycle);
const task = { id } as ConcreteTaskInstance;
const otherTask = { id: differentTask } as ConcreteTaskInstance;
events$.next(asTaskClaimEvent(id, asOk(task)));
events$.next(asTaskClaimEvent(differentTask, asOk(otherTask)));
events$.next(asTaskRunEvent(differentTask, asOk(task)));
events$.next(asTaskRunEvent(id, asErr(new Error('some thing gone wrong'))));
return expect(result).rejects.toEqual(new Error('some thing gone wrong'));
});
});
});
describe('claimAvailableTasks', () => {
test('should claim Available Tasks when there are available workers', () => {
const logger = mockLogger();
@ -284,18 +458,18 @@ describe('TaskManager', () => {
const availableWorkers = 1;
claimAvailableTasks(claim, availableWorkers, logger);
claimAvailableTasks([], claim, availableWorkers, logger);
expect(claim).toHaveBeenCalledTimes(1);
});
test('shouldnt claim Available Tasks when there are no available workers', () => {
test('should not claim Available Tasks when there are no available workers', () => {
const logger = mockLogger();
const claim = jest.fn(() => Promise.resolve({ docs: [], claimedTasks: 0 }));
const availableWorkers = 0;
claimAvailableTasks(claim, availableWorkers, logger);
claimAvailableTasks([], claim, availableWorkers, logger);
expect(claim).not.toHaveBeenCalled();
});
@ -324,7 +498,7 @@ describe('TaskManager', () => {
});
});
claimAvailableTasks(claim, 10, logger);
claimAvailableTasks([], claim, 10, logger);
expect(logger.warn).toHaveBeenCalledTimes(1);
expect(logger.warn.mock.calls[0][0]).toMatchInlineSnapshot(

View file

@ -3,12 +3,30 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Subject, Observable, Subscription } from 'rxjs';
import { filter } from 'rxjs/operators';
import { performance } from 'perf_hooks';
// Task manager uses an unconventional directory structure so the linter marks this as a violation, server files should
// be moved under task_manager/server/
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { SavedObjectsClientContract, SavedObjectsSerializer } from 'src/core/server';
import { pipe } from 'fp-ts/lib/pipeable';
import { Option, none, some, map as mapOptional } from 'fp-ts/lib/Option';
import { Result, asErr, either, map, mapErr, promiseResult } from './lib/result_type';
import { Logger } from './types';
import {
TaskMarkRunning,
TaskRun,
TaskClaim,
TaskRunRequest,
isTaskRunEvent,
isTaskClaimEvent,
isTaskRunRequestEvent,
asTaskRunRequestEvent,
} from './task_events';
import { fillPool, FillPoolResult } from './lib/fill_pool';
import { addMiddlewareToChain, BeforeSaveMiddlewareParams, Middleware } from './lib/middleware';
import { sanitizeTaskDefinitions } from './lib/sanitize_task_definitions';
@ -20,10 +38,13 @@ import {
RunContext,
TaskInstanceWithId,
TaskInstance,
TaskLifecycle,
TaskLifecycleResult,
TaskStatus,
} from './task';
import { TaskPoller } from './task_poller';
import { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
import { TaskPool } from './task_pool';
import { TaskManagerRunner } from './task_runner';
import { TaskManagerRunner, TaskRunner } from './task_runner';
import {
FetchOpts,
FetchResult,
@ -43,6 +64,12 @@ export interface TaskManagerOpts {
serializer: SavedObjectsSerializer;
}
interface RunNowResult {
id: string;
}
export type TaskLifecycleEvent = TaskMarkRunning | TaskRun | TaskClaim | TaskRunRequest;
/*
* The TaskManager is the public interface into the task manager system. This glues together
* all of the disparate modules in one integration point. The task manager operates in two different ways:
@ -57,14 +84,19 @@ export interface TaskManagerOpts {
* The public interface into the task manager system.
*/
export class TaskManager {
private isStarted = false;
private maxWorkers: number;
private readonly pollerInterval: number;
private definitions: TaskDictionary<TaskDefinition>;
private definitions: TaskDictionary<TaskDefinition> = {};
private store: TaskStore;
private poller: TaskPoller<FillPoolResult>;
private logger: Logger;
private pool: TaskPool;
// all task related events (task claimed, task marked as running, etc.) are emitted through events$
private events$ = new Subject<TaskLifecycleEvent>();
// all on-demand requests we wish to pipe into the poller
private claimRequests$ = new Subject<Option<string>>();
// the task poller that polls for work on fixed intervals and on demand
private poller$: Observable<Result<FillPoolResult, PollingError<string>>>;
// our subscription to the poller
private pollingSubscription: Subscription = Subscription.EMPTY;
private startQueue: Array<() => void> = [];
private middleware = {
beforeSave: async (saveOpts: BeforeSaveMiddlewareParams) => saveOpts,
@ -78,9 +110,6 @@ export class TaskManager {
* mechanism.
*/
constructor(opts: TaskManagerOpts) {
this.maxWorkers = opts.config.get('xpack.task_manager.max_workers');
this.pollerInterval = opts.config.get('xpack.task_manager.poll_interval');
this.definitions = {};
this.logger = opts.logger;
const taskManagerId = opts.config.get('server.uuid');
@ -93,7 +122,7 @@ export class TaskManager {
this.logger.info(`TaskManager is identified by the Kibana UUID: ${taskManagerId}`);
}
const store = new TaskStore({
this.store = new TaskStore({
serializer: opts.serializer,
savedObjectsRepository: opts.savedObjectsRepository,
callCluster: opts.callWithInternalUser,
@ -102,63 +131,85 @@ export class TaskManager {
definitions: this.definitions,
taskManagerId: `kibana:${taskManagerId}`,
});
// pipe store events into the TaskManager's event stream
this.store.events.subscribe(event => this.events$.next(event));
const pool = new TaskPool({
this.pool = new TaskPool({
logger: this.logger,
maxWorkers: this.maxWorkers,
maxWorkers: opts.config.get('xpack.task_manager.max_workers'),
});
const createRunner = (instance: ConcreteTaskInstance) =>
new TaskManagerRunner({
logger: this.logger,
instance,
store,
definitions: this.definitions,
beforeRun: this.middleware.beforeRun,
beforeMarkRunning: this.middleware.beforeMarkRunning,
});
const poller = new TaskPoller<FillPoolResult>({
logger: this.logger,
this.poller$ = createTaskPoller<string, FillPoolResult>({
pollInterval: opts.config.get('xpack.task_manager.poll_interval'),
work: (): Promise<FillPoolResult> =>
fillPool(
async tasks => await pool.run(tasks),
() =>
claimAvailableTasks(
this.store.claimAvailableTasks.bind(this.store),
this.pool.availableWorkers,
this.logger
),
createRunner
),
bufferCapacity: opts.config.get('xpack.task_manager.request_capacity'),
getCapacity: () => this.pool.availableWorkers,
pollRequests$: this.claimRequests$,
work: this.pollForWork,
});
this.pool = pool;
this.store = store;
this.poller = poller;
}
private emitEvent = (event: TaskLifecycleEvent) => {
this.events$.next(event);
};
private attemptToRun(task: Option<string> = none) {
this.claimRequests$.next(task);
}
private createTaskRunnerForTask = (instance: ConcreteTaskInstance) => {
return new TaskManagerRunner({
logger: this.logger,
instance,
store: this.store,
definitions: this.definitions,
beforeRun: this.middleware.beforeRun,
beforeMarkRunning: this.middleware.beforeMarkRunning,
onTaskEvent: this.emitEvent,
});
};
public get isStarted() {
return !this.pollingSubscription.closed;
}
private pollForWork = async (...tasksToClaim: string[]): Promise<FillPoolResult> => {
return fillPool(
// claim available tasks
() =>
claimAvailableTasks(
tasksToClaim.splice(0, this.pool.availableWorkers),
this.store.claimAvailableTasks,
this.pool.availableWorkers,
this.logger
),
// wrap each task in a Task Runner
this.createTaskRunnerForTask,
// place tasks in the Task Pool
async (tasks: TaskRunner[]) => await this.pool.run(tasks)
);
};
/**
* Starts up the task manager and starts picking up tasks.
*/
public start() {
this.isStarted = true;
// Some calls are waiting until task manager is started
this.startQueue.forEach(fn => fn());
this.startQueue = [];
const startPoller = async () => {
try {
await this.poller.start();
} catch (err) {
// FIXME: check the type of error to make sure it's actually an ES error
this.logger.warn(`PollError ${err.message}`);
if (!this.isStarted) {
// Some calls are waiting until task manager is started
this.startQueue.forEach(fn => fn());
this.startQueue = [];
// rety again to initialize store and poller, using the timing of
// task_manager's configurable poll interval
const retryInterval = this.pollerInterval;
setTimeout(() => startPoller(), retryInterval);
}
};
startPoller();
this.pollingSubscription = this.poller$.subscribe(
mapErr((error: PollingError<string>) => {
if (error.type === PollingErrorType.RequestCapacityReached) {
pipe(
error.data,
mapOptional(id => this.emitEvent(asTaskRunRequestEvent(id, asErr(error))))
);
}
this.logger.error(error.message);
})
);
}
}
private async waitUntilStarted() {
@ -173,8 +224,10 @@ export class TaskManager {
* Stops the task manager and cancels running tasks.
*/
public stop() {
this.poller.stop();
this.pool.cancelRunningTasks();
if (this.isStarted) {
this.pollingSubscription.unsubscribe();
this.pool.cancelRunningTasks();
}
}
/**
@ -221,10 +274,27 @@ export class TaskManager {
taskInstance,
});
const result = await this.store.schedule(modifiedTask);
this.poller.attemptWork();
this.attemptToRun();
return result;
}
/**
* Run task.
*
* @param taskId - The task being scheduled.
* @returns {Promise<ConcreteTaskInstance>}
*/
public async runNow(taskId: string): Promise<RunNowResult> {
await this.waitUntilStarted();
return new Promise(async (resolve, reject) => {
awaitTaskRunResult(taskId, this.events$, this.store.getLifecycle.bind(this.store))
.then(resolve)
.catch(reject);
this.attemptToRun(some(taskId));
});
}
/**
* Schedules a task with an Id
*
@ -281,6 +351,7 @@ export class TaskManager {
}
export async function claimAvailableTasks(
claimTasksById: string[],
claim: (opts: OwnershipClaimingOpts) => Promise<ClaimOwnershipResult>,
availableWorkers: number,
logger: Logger
@ -292,6 +363,7 @@ export async function claimAvailableTasks(
const { docs, claimedTasks } = await claim({
size: availableWorkers,
claimOwnershipUntil: intervalFromNow('30s')!,
claimTasksById,
});
if (claimedTasks === 0) {
@ -327,3 +399,59 @@ export async function claimAvailableTasks(
}
return [];
}
export async function awaitTaskRunResult(
taskId: string,
events$: Subject<TaskLifecycleEvent>,
getLifecycle: (id: string) => Promise<TaskLifecycle>
): Promise<RunNowResult> {
return new Promise((resolve, reject) => {
const subscription = events$
// listen for all events related to the current task
.pipe(filter(({ id }: TaskLifecycleEvent) => id === taskId))
.subscribe((taskEvent: TaskLifecycleEvent) => {
either(
taskEvent.event,
(taskInstance: ConcreteTaskInstance) => {
// resolve if the task has run sucessfully
if (isTaskRunEvent(taskEvent)) {
subscription.unsubscribe();
resolve({ id: taskInstance.id });
}
},
async (error: Error) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
if (isTaskRunRequestEvent(taskEvent)) {
return reject(
new Error(
`Failed to run task "${taskId}" as Task Manager is at capacity, please try again later`
)
);
} else if (isTaskClaimEvent(taskEvent)) {
reject(
map(
// if the error happened in the Claim phase - we try to provide better insight
// into why we failed to claim by getting the task's current lifecycle status
await promiseResult<TaskLifecycle, Error>(getLifecycle(taskId)),
(taskLifecycleStatus: TaskLifecycle) => {
if (taskLifecycleStatus === TaskLifecycleResult.NotFound) {
return new Error(`Failed to run task "${taskId}" as it does not exist`);
} else if (
taskLifecycleStatus === TaskStatus.Running ||
taskLifecycleStatus === TaskStatus.Claiming
) {
return new Error(`Failed to run task "${taskId}" as it is currently running`);
}
return error;
},
() => error
)
);
}
return reject(error);
}
);
});
});
}

View file

@ -5,138 +5,403 @@
*/
import _ from 'lodash';
import sinon from 'sinon';
import { TaskPoller } from './task_poller';
import { mockLogger, resolvable, sleep } from './test_utils';
import { Subject } from 'rxjs';
import { Option, none, some } from 'fp-ts/lib/Option';
import { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
import { fakeSchedulers } from 'rxjs-marbles/jest';
import { sleep, resolvable } from './test_utils';
import { asOk, asErr } from './lib/result_type';
describe('TaskPoller', () => {
beforeEach(() => {
const callCluster = sinon.stub();
callCluster.withArgs('indices.getTemplate').returns(Promise.resolve({ tasky: {} }));
});
beforeEach(() => jest.useFakeTimers());
describe('interval tests', () => {
let clock: sinon.SinonFakeTimers;
test(
'intializes the poller with the provided interval',
fakeSchedulers(async advance => {
const pollInterval = 100;
const bufferCapacity = 5;
const halfInterval = Math.floor(pollInterval / 2);
beforeEach(() => {
clock = sinon.useFakeTimers();
});
afterEach(() => clock.restore());
test('runs the work function on an interval', async () => {
const pollInterval = _.random(10, 20);
const done = resolvable();
const work = sinon.spy(() => {
done.resolve();
return Promise.resolve();
});
const poller = new TaskPoller({
const work = jest.fn(async () => true);
createTaskPoller<void, boolean>({
pollInterval,
bufferCapacity,
getCapacity: () => 1,
work,
logger: mockLogger(),
pollRequests$: new Subject<Option<void>>(),
}).subscribe(() => {});
// `work` is async, we have to force a node `tick`
await sleep(0);
advance(halfInterval);
expect(work).toHaveBeenCalledTimes(0);
advance(halfInterval);
await sleep(0);
expect(work).toHaveBeenCalledTimes(1);
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(2);
})
);
test(
'filters interval polling on capacity',
fakeSchedulers(async advance => {
const pollInterval = 100;
const bufferCapacity = 2;
const work = jest.fn(async () => true);
let hasCapacity = true;
createTaskPoller<void, boolean>({
pollInterval,
bufferCapacity,
work,
getCapacity: () => (hasCapacity ? 1 : 0),
pollRequests$: new Subject<Option<void>>(),
}).subscribe(() => {});
expect(work).toHaveBeenCalledTimes(0);
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(1);
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(2);
hasCapacity = false;
await sleep(0);
advance(pollInterval);
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(2);
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(2);
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(2);
hasCapacity = true;
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(3);
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(4);
})
);
test(
'requests with no arguments (nudge requests) are queued on-demand in between intervals',
fakeSchedulers(async advance => {
const pollInterval = 100;
const bufferCapacity = 2;
const querterInterval = Math.floor(pollInterval / 4);
const halfInterval = querterInterval * 2;
const work = jest.fn(async () => true);
const pollRequests$ = new Subject<Option<void>>();
createTaskPoller<void, boolean>({
pollInterval,
bufferCapacity,
work,
getCapacity: () => 1,
pollRequests$,
}).subscribe(jest.fn());
expect(work).toHaveBeenCalledTimes(0);
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(1);
advance(querterInterval);
await sleep(0);
expect(work).toHaveBeenCalledTimes(1);
pollRequests$.next(none);
expect(work).toHaveBeenCalledTimes(2);
expect(work).toHaveBeenNthCalledWith(2);
await sleep(0);
advance(querterInterval);
expect(work).toHaveBeenCalledTimes(2);
await sleep(0);
advance(halfInterval);
expect(work).toHaveBeenCalledTimes(3);
})
);
test(
'requests with no arguments (nudge requests) are dropped when there is no capacity',
fakeSchedulers(async advance => {
const pollInterval = 100;
const bufferCapacity = 2;
const querterInterval = Math.floor(pollInterval / 4);
const halfInterval = querterInterval * 2;
let hasCapacity = true;
const work = jest.fn(async () => true);
const pollRequests$ = new Subject<Option<void>>();
createTaskPoller<void, boolean>({
pollInterval,
bufferCapacity,
work,
getCapacity: () => (hasCapacity ? 1 : 0),
pollRequests$,
}).subscribe(() => {});
expect(work).toHaveBeenCalledTimes(0);
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(1);
hasCapacity = false;
await sleep(0);
advance(querterInterval);
pollRequests$.next(none);
expect(work).toHaveBeenCalledTimes(1);
await sleep(0);
advance(querterInterval);
hasCapacity = true;
advance(halfInterval);
expect(work).toHaveBeenCalledTimes(2);
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(3);
})
);
test(
'requests with arguments are emitted',
fakeSchedulers(async advance => {
const pollInterval = 100;
const bufferCapacity = 2;
const work = jest.fn(async () => true);
const pollRequests$ = new Subject<Option<string>>();
createTaskPoller<string, boolean>({
pollInterval,
bufferCapacity,
work,
getCapacity: () => 1,
pollRequests$,
}).subscribe(() => {});
advance(pollInterval);
pollRequests$.next(some('one'));
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledWith('one');
pollRequests$.next(some('two'));
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledWith('two');
})
);
test(
'waits for work to complete before emitting the next event',
fakeSchedulers(async advance => {
const pollInterval = 100;
const bufferCapacity = 2;
const worker = resolvable();
const handler = jest.fn();
const pollRequests$ = new Subject<Option<string>>();
createTaskPoller<string, string[]>({
pollInterval,
bufferCapacity,
work: async (...args) => {
await worker;
return args;
},
getCapacity: () => 5,
pollRequests$,
}).subscribe(handler);
pollRequests$.next(some('one'));
advance(pollInterval);
// work should now be in progress
pollRequests$.next(none);
pollRequests$.next(some('two'));
pollRequests$.next(some('three'));
advance(pollInterval);
await sleep(pollInterval);
expect(handler).toHaveBeenCalledTimes(0);
worker.resolve();
advance(pollInterval);
await sleep(pollInterval);
expect(handler).toHaveBeenCalledWith(asOk(['one']));
advance(pollInterval);
expect(handler).toHaveBeenCalledWith(asOk(['two', 'three']));
})
);
test(
'returns an error when polling for work fails',
fakeSchedulers(async advance => {
const pollInterval = 100;
const bufferCapacity = 2;
const handler = jest.fn();
const pollRequests$ = new Subject<Option<string>>();
createTaskPoller<string, string[]>({
pollInterval,
bufferCapacity,
work: async (...args) => {
throw new Error('failed to work');
},
getCapacity: () => 5,
pollRequests$,
}).subscribe(handler);
advance(pollInterval);
await sleep(0);
const expectedError = new PollingError<string>(
'Failed to poll for work: Error: failed to work',
PollingErrorType.WorkError,
none
);
expect(handler).toHaveBeenCalledWith(asErr(expectedError));
expect(handler.mock.calls[0][0].error.type).toEqual(PollingErrorType.WorkError);
})
);
test(
'continues polling after work fails',
fakeSchedulers(async advance => {
const pollInterval = 100;
const bufferCapacity = 2;
const handler = jest.fn();
const pollRequests$ = new Subject<Option<string>>();
let callCount = 0;
const work = jest.fn(async () => {
callCount++;
if (callCount === 2) {
throw new Error('failed to work');
}
return callCount;
});
createTaskPoller<string, number>({
pollInterval,
bufferCapacity,
work,
getCapacity: () => 5,
pollRequests$,
}).subscribe(handler);
await poller.start();
advance(pollInterval);
await sleep(0);
sinon.assert.calledOnce(work);
await done;
expect(handler).toHaveBeenCalledWith(asOk(1));
clock.tick(pollInterval - 1);
sinon.assert.calledOnce(work);
clock.tick(1);
sinon.assert.calledTwice(work);
});
});
advance(pollInterval);
await sleep(0);
test('logs, but does not crash if the work function fails', async () => {
let count = 0;
const logger = mockLogger();
const doneWorking = resolvable();
const poller = new TaskPoller({
logger,
pollInterval: 1,
work: async () => {
++count;
if (count === 1) {
throw new Error('Dang it!');
}
if (count > 1) {
poller.stop();
doneWorking.resolve();
}
},
});
const expectedError = new PollingError<string>(
'Failed to poll for work: Error: failed to work',
PollingErrorType.WorkError,
none
);
expect(handler).toHaveBeenCalledWith(asErr(expectedError));
expect(handler.mock.calls[1][0].error.type).toEqual(PollingErrorType.WorkError);
expect(handler).not.toHaveBeenCalledWith(asOk(2));
poller.start();
advance(pollInterval);
await sleep(0);
await doneWorking;
expect(handler).toHaveBeenCalledWith(asOk(3));
})
);
expect(count).toEqual(2);
expect(logger.error.mock.calls[0][0]).toMatchInlineSnapshot(
`"Failed to poll for work: Error: Dang it!"`
);
});
test(
'returns a request capcity error when new request is emitted but the poller is at buffer capacity',
fakeSchedulers(async advance => {
const pollInterval = 1000;
const bufferCapacity = 2;
test('is stoppable', async () => {
const doneWorking = resolvable();
const work = sinon.spy(async () => {
poller.stop();
doneWorking.resolve();
});
const handler = jest.fn();
const work = jest.fn(async () => {});
const pollRequests$ = new Subject<Option<string>>();
createTaskPoller<string, void>({
pollInterval,
bufferCapacity,
work,
getCapacity: () => 5,
pollRequests$,
}).subscribe(handler);
const poller = new TaskPoller({
logger: mockLogger(),
pollInterval: 1,
work,
});
// advance(pollInterval);
poller.start();
await doneWorking;
await sleep(10);
pollRequests$.next(some('one'));
sinon.assert.calledOnce(work);
});
await sleep(0);
advance(pollInterval);
test('disregards duplicate calls to "start"', async () => {
const doneWorking = resolvable();
const work = sinon.spy(async () => {
await doneWorking;
});
const poller = new TaskPoller({
pollInterval: 1,
logger: mockLogger(),
work,
});
expect(work).toHaveBeenCalledWith('one');
await poller.start();
poller.start();
poller.start();
poller.start();
pollRequests$.next(some('two'));
pollRequests$.next(some('three'));
// three consecutive should cause us to go above capacity
pollRequests$.next(some('four'));
poller.stop();
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledWith('two', 'three');
doneWorking.resolve();
pollRequests$.next(some('five'));
pollRequests$.next(some('six'));
sinon.assert.calledOnce(work);
});
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledWith('five', 'six');
test('waits for work before polling', async () => {
const doneWorking = resolvable();
const work = sinon.spy(async () => {
await sleep(10);
poller.stop();
doneWorking.resolve();
});
const poller = new TaskPoller({
pollInterval: 1,
logger: mockLogger(),
work,
});
poller.start();
await doneWorking;
sinon.assert.calledOnce(work);
});
expect(handler).toHaveBeenCalledWith(
asErr(
new PollingError<string>(
'Failed to poll for work: request capacity reached',
PollingErrorType.RequestCapacityReached,
some('four')
)
)
);
})
);
});

View file

@ -9,107 +9,150 @@
*/
import { performance } from 'perf_hooks';
import { Logger } from './types';
import { after } from 'lodash';
import { Subject, merge, interval, of, Observable } from 'rxjs';
import { mapTo, filter, scan, concatMap, tap, catchError } from 'rxjs/operators';
type WorkFn<T> = () => Promise<T>;
import { pipe } from 'fp-ts/lib/pipeable';
import { Option, none, map as mapOptional, getOrElse } from 'fp-ts/lib/Option';
import { pullFromSet } from './lib/pull_from_set';
import {
Result,
Err,
isErr,
map as mapResult,
asOk,
asErr,
promiseResult,
} from './lib/result_type';
interface Opts<T> {
type WorkFn<T, H> = (...params: T[]) => Promise<H>;
interface Opts<T, H> {
pollInterval: number;
logger: Logger;
work: WorkFn<T>;
bufferCapacity: number;
getCapacity: () => number;
pollRequests$: Observable<Option<T>>;
work: WorkFn<T, H>;
}
/**
* Performs work on a scheduled interval, logging any errors. This waits for work to complete
* (or error) prior to attempting another run.
* constructs a new TaskPoller stream, which emits events on demand and on a scheduled interval, waiting for capacity to be available before emitting more events.
*
* @param opts
* @prop {number} pollInterval - How often, in milliseconds, we will an event be emnitted, assuming there's capacity to do so
* @prop {() => number} getCapacity - A function specifying whether there is capacity to emit new events
* @prop {Observable<Option<T>>} pollRequests$ - A stream of requests for polling which can provide an optional argument for the polling phase
* @prop {number} bufferCapacity - How many requests are do we allow our buffer to accumulate before rejecting requests?
* @prop {(...params: T[]) => Promise<H>} work - The work we wish to execute in order to `poll`, this is the operation we're actually executing on request
*
* @returns {Observable<Set<T>>} - An observable which emits an event whenever a polling event is due to take place, providing access to a singleton Set representing a queue
* of unique request argumets of type T. The queue holds all the buffered request arguments streamed in via pollRequests$
*/
export class TaskPoller<T> {
private isStarted = false;
private isWorking = false;
private timeout: any;
private pollInterval: number;
private logger: Logger;
private work: WorkFn<T>;
export function createTaskPoller<T, H>({
pollInterval,
getCapacity,
pollRequests$,
bufferCapacity,
work,
}: Opts<T, H>): Observable<Result<H, PollingError<T>>> {
const hasCapacity = () => getCapacity() > 0;
/**
* Constructs a new TaskPoller.
*
* @param opts
* @prop {number} pollInterval - How often, in milliseconds, we will run the work function
* @prop {Logger} logger - The task manager logger
* @prop {WorkFn} work - An empty, asynchronous function that performs the desired work
*/
constructor(opts: Opts<T>) {
this.pollInterval = opts.pollInterval;
this.logger = opts.logger;
this.work = opts.work;
}
const errors$ = new Subject<Err<PollingError<T>>>();
/**
* Starts the poller. If the poller is already running, this has no effect.
*/
public async start() {
if (this.isStarted) {
return;
}
this.isStarted = true;
const poll = async () => {
await this.attemptWork();
performance.mark('TaskPoller.sleep');
if (this.isStarted) {
this.timeout = setTimeout(
tryAndLogOnError(() => {
performance.mark('TaskPoller.poll');
performance.measure('TaskPoller.sleepDuration', 'TaskPoller.sleep', 'TaskPoller.poll');
poll();
}, this.logger),
this.pollInterval
const requestWorkProcessing$ = merge(
// emit a polling event on demand
pollRequests$,
// emit a polling event on a fixed interval
interval(pollInterval).pipe(mapTo(none))
).pipe(
// buffer all requests in a single set (to remove duplicates) as we don't want
// work to take place in parallel (it could cause Task Manager to pull in the same
// task twice)
scan<Option<T>, Set<T>>((queue, request) => {
if (isErr(pushOptionalIntoSet(queue, bufferCapacity, request))) {
// value wasnt pushed into buffer, we must be at capacity
errors$.next(
asPollingError<T>(
`request capacity reached`,
PollingErrorType.RequestCapacityReached,
request
)
);
}
};
return queue;
}, new Set<T>()),
// only emit polling events when there's capacity to handle them
filter(hasCapacity),
// take as many argumented calls as we have capacity for and call `work` with
// those arguments. If the queue is empty this will still trigger work to be done
concatMap(async (set: Set<T>) => {
closeSleepPerf();
return mapResult<H, Error, Result<H, PollingError<T>>>(
await promiseResult<H, Error>(work(...pullFromSet(set, getCapacity()))),
workResult => asOk(workResult),
(err: Error) => {
return asPollingError<T>(err, PollingErrorType.WorkError);
}
);
}),
tap(openSleepPerf),
// catch errors during polling for work
catchError((err: Error) => of(asPollingError<T>(err, PollingErrorType.WorkError)))
);
poll();
}
return merge(requestWorkProcessing$, errors$);
}
/**
* Unwraps optional values and pushes them into a set
* @param set A Set of generic type T
* @param maxCapacity How many values are we allowed to push into the set
* @param value An optional T to push into the set if it is there
*/
function pushOptionalIntoSet<T>(
set: Set<T>,
maxCapacity: number,
value: Option<T>
): Result<Set<T>, Set<T>> {
return pipe(
value,
mapOptional<T, Result<Set<T>, Set<T>>>(req => {
if (set.size >= maxCapacity) {
return asErr(set);
}
set.add(req);
return asOk(set);
}),
getOrElse(() => asOk(set) as Result<Set<T>, Set<T>>)
);
}
/**
* Stops the poller.
*/
public stop() {
this.isStarted = false;
clearTimeout(this.timeout);
this.timeout = undefined;
}
export enum PollingErrorType {
WorkError,
RequestCapacityReached,
}
/**
* Runs the work function. If the work function is currently running,
* this has no effect.
*/
public async attemptWork() {
if (!this.isStarted || this.isWorking) {
return;
}
function asPollingError<T>(err: string | Error, type: PollingErrorType, data: Option<T> = none) {
return asErr(new PollingError<T>(`Failed to poll for work: ${err}`, type, data));
}
this.isWorking = true;
try {
await this.work();
} catch (err) {
this.logger.error(`Failed to poll for work: ${err}`);
} finally {
this.isWorking = false;
}
export class PollingError<T> extends Error {
public readonly type: PollingErrorType;
public readonly data: Option<T>;
constructor(message: string, type: PollingErrorType, data: Option<T>) {
super(message);
Object.setPrototypeOf(this, new.target.prototype);
this.type = type;
this.data = data;
}
}
function tryAndLogOnError(fn: Function, logger: Logger): Function {
return () => {
try {
fn();
} catch (err) {
logger.error(`Task Poller polling phase failed: ${err}`);
}
};
}
const openSleepPerf = () => {
performance.mark('TaskPoller.sleep');
};
// we only want to close after an open has been called, as we're counting the time *between* work cycles
// so we'll ignore the first call to `closeSleepPerf` but we will run every subsequent call
const closeSleepPerf = after(2, () => {
performance.mark('TaskPoller.poll');
performance.measure('TaskPoller.sleepDuration', 'TaskPoller.sleep', 'TaskPoller.poll');
});

View file

@ -7,6 +7,7 @@
import sinon from 'sinon';
import { TaskPool, TaskPoolRunResult } from './task_pool';
import { mockLogger, resolvable, sleep } from './test_utils';
import { asOk } from './lib/result_type';
describe('TaskPool', () => {
test('occupiedWorkers are a sum of running tasks', async () => {
@ -128,13 +129,13 @@ describe('TaskPool', () => {
const firstRun = sinon.spy(async () => {
await sleep(0);
firstWork.resolve();
return { state: {} };
return asOk({ state: {} });
});
const secondWork = resolvable();
const secondRun = sinon.spy(async () => {
await sleep(0);
secondWork.resolve();
return { state: {} };
return asOk({ state: {} });
});
const result = await pool.run([
@ -179,9 +180,7 @@ describe('TaskPool', () => {
this.isExpired = true;
expired.resolve();
await sleep(10);
return {
state: {},
};
return asOk({ state: {} });
},
cancel: shouldRun,
},
@ -189,9 +188,7 @@ describe('TaskPool', () => {
...mockTask(),
async run() {
await sleep(10);
return {
state: {},
};
return asOk({ state: {} });
},
cancel: shouldNotRun,
},
@ -225,9 +222,7 @@ describe('TaskPool', () => {
async run() {
this.isExpired = true;
await sleep(10);
return {
state: {},
};
return asOk({ state: {} });
},
async cancel() {
cancelled.resolve();
@ -253,13 +248,14 @@ describe('TaskPool', () => {
function mockRun() {
return jest.fn(async () => {
await sleep(0);
return { state: {} };
return asOk({ state: {} });
});
}
function mockTask() {
return {
isExpired: false,
id: 'foo',
cancel: async () => undefined,
markTaskAsRunning: jest.fn(async () => true),
run: mockRun(),

View file

@ -59,6 +59,13 @@ export class TaskPool {
return this.maxWorkers - this.occupiedWorkers;
}
/**
* Gets how many workers are currently available.
*/
public get hasAvailableWorkers() {
return this.availableWorkers > 0;
}
/**
* Attempts to run the specified list of tasks. Returns true if it was able
* to start every task in the list, false if there was not enough capacity
@ -85,18 +92,18 @@ export class TaskPool {
performance.mark('attemptToRun_start');
await Promise.all(
tasksToRun.map(
async task =>
await task
async taskRunner =>
await taskRunner
.markTaskAsRunning()
.then((hasTaskBeenMarkAsRunning: boolean) =>
hasTaskBeenMarkAsRunning
? this.handleMarkAsRunning(task)
: this.handleFailureOfMarkAsRunning(task, {
? this.handleMarkAsRunning(taskRunner)
: this.handleFailureOfMarkAsRunning(taskRunner, {
name: 'TaskPoolVersionConflictError',
message: VERSION_CONFLICT_MESSAGE,
})
)
.catch(ex => this.handleFailureOfMarkAsRunning(task, ex))
.catch(err => this.handleFailureOfMarkAsRunning(taskRunner, err))
)
);
@ -113,14 +120,14 @@ export class TaskPool {
return TaskPoolRunResult.RunningAllClaimedTasks;
}
private handleMarkAsRunning(task: TaskRunner) {
this.running.add(task);
task
private handleMarkAsRunning(taskRunner: TaskRunner) {
this.running.add(taskRunner);
taskRunner
.run()
.catch(err => {
this.logger.warn(`Task ${task.toString()} failed in attempt to run: ${err.message}`);
this.logger.warn(`Task ${taskRunner.toString()} failed in attempt to run: ${err.message}`);
})
.then(() => this.running.delete(task));
.then(() => this.running.delete(taskRunner));
}
private handleFailureOfMarkAsRunning(task: TaskRunner, err: Error) {

View file

@ -7,7 +7,9 @@
import _ from 'lodash';
import sinon from 'sinon';
import { minutesFromNow } from './lib/intervals';
import { ConcreteTaskInstance } from './task';
import { asOk, asErr } from './lib/result_type';
import { TaskEvent, asTaskRunEvent, asTaskMarkRunningEvent } from './task_events';
import { ConcreteTaskInstance, TaskStatus } from './task';
import { TaskManagerRunner } from './task_runner';
import { mockLogger } from './test_utils';
// Task manager uses an unconventional directory structure so the linter marks this as a violation, server files should
@ -91,7 +93,7 @@ describe('TaskManagerRunner', () => {
const { runner, store } = testOpts({
instance: {
interval: '10m',
status: 'running',
status: TaskStatus.Running,
startedAt: new Date(),
},
definitions: {
@ -660,45 +662,245 @@ describe('TaskManagerRunner', () => {
);
});
describe('TaskEvents', () => {
test('emits TaskEvent when a task is marked as running', async () => {
const id = _.random(1, 20).toString();
const onTaskEvent = jest.fn();
const { runner, instance, store } = testOpts({
onTaskEvent,
instance: {
id,
},
definitions: {
bar: {
timeout: `1m`,
getRetry: () => {},
createTaskRunner: () => ({
run: async () => undefined,
}),
},
},
});
store.update.returns(instance);
await runner.markTaskAsRunning();
expect(onTaskEvent).toHaveBeenCalledWith(asTaskMarkRunningEvent(id, asOk(instance)));
});
test('emits TaskEvent when a task fails to be marked as running', async () => {
expect.assertions(2);
const id = _.random(1, 20).toString();
const onTaskEvent = jest.fn();
const { runner, store } = testOpts({
onTaskEvent,
instance: {
id,
},
definitions: {
bar: {
timeout: `1m`,
getRetry: () => {},
createTaskRunner: () => ({
run: async () => undefined,
}),
},
},
});
store.update.throws(new Error('cant mark as running'));
try {
await runner.markTaskAsRunning();
} catch (err) {
expect(onTaskEvent).toHaveBeenCalledWith(asTaskMarkRunningEvent(id, asErr(err)));
}
expect(onTaskEvent).toHaveBeenCalledTimes(1);
});
test('emits TaskEvent when a task is run successfully', async () => {
const id = _.random(1, 20).toString();
const onTaskEvent = jest.fn();
const { runner, instance } = testOpts({
onTaskEvent,
instance: {
id,
},
definitions: {
bar: {
createTaskRunner: () => ({
async run() {
return {};
},
}),
},
},
});
await runner.run();
expect(onTaskEvent).toHaveBeenCalledWith(asTaskRunEvent(id, asOk(instance)));
});
test('emits TaskEvent when a recurring task is run successfully', async () => {
const id = _.random(1, 20).toString();
const runAt = minutesFromNow(_.random(5));
const onTaskEvent = jest.fn();
const { runner, instance } = testOpts({
onTaskEvent,
instance: {
id,
interval: '1m',
},
definitions: {
bar: {
createTaskRunner: () => ({
async run() {
return { runAt };
},
}),
},
},
});
await runner.run();
expect(onTaskEvent).toHaveBeenCalledWith(asTaskRunEvent(id, asOk(instance)));
});
test('emits TaskEvent when a task run throws an error', async () => {
const id = _.random(1, 20).toString();
const error = new Error('Dangit!');
const onTaskEvent = jest.fn();
const { runner } = testOpts({
onTaskEvent,
instance: {
id,
},
definitions: {
bar: {
createTaskRunner: () => ({
async run() {
throw error;
},
}),
},
},
});
await runner.run();
expect(onTaskEvent).toHaveBeenCalledWith(asTaskRunEvent(id, asErr(error)));
expect(onTaskEvent).toHaveBeenCalledTimes(1);
});
test('emits TaskEvent when a task run returns an error', async () => {
const id = _.random(1, 20).toString();
const error = new Error('Dangit!');
const onTaskEvent = jest.fn();
const { runner } = testOpts({
onTaskEvent,
instance: {
id,
interval: '1m',
startedAt: new Date(),
},
definitions: {
bar: {
createTaskRunner: () => ({
async run() {
return { error };
},
}),
},
},
});
await runner.run();
expect(onTaskEvent).toHaveBeenCalledWith(asTaskRunEvent(id, asErr(error)));
expect(onTaskEvent).toHaveBeenCalledTimes(1);
});
test('emits TaskEvent when a task returns an error and is marked as failed', async () => {
const id = _.random(1, 20).toString();
const error = new Error('Dangit!');
const onTaskEvent = jest.fn();
const { runner, store } = testOpts({
onTaskEvent,
instance: {
id,
startedAt: new Date(),
},
definitions: {
bar: {
getRetry: () => false,
createTaskRunner: () => ({
async run() {
return { error };
},
}),
},
},
});
await runner.run();
const instance = store.update.args[0][0];
expect(instance.status).toBe('failed');
expect(onTaskEvent).toHaveBeenCalledWith(asTaskRunEvent(id, asErr(error)));
expect(onTaskEvent).toHaveBeenCalledTimes(1);
});
});
interface TestOpts {
instance?: Partial<ConcreteTaskInstance>;
definitions?: any;
onTaskEvent?: (event: TaskEvent<any, any>) => void;
}
function testOpts(opts: TestOpts) {
const callCluster = sinon.stub();
const createTaskRunner = sinon.stub();
const logger = mockLogger();
const instance = Object.assign(
{
id: 'foo',
taskType: 'bar',
sequenceNumber: 32,
primaryTerm: 32,
runAt: new Date(),
scheduledAt: new Date(),
startedAt: null,
retryAt: null,
attempts: 0,
params: {},
scope: ['reporting'],
state: {},
status: 'idle',
user: 'example',
ownerId: null,
},
opts.instance || {}
);
const store = {
update: sinon.stub(),
remove: sinon.stub(),
maxAttempts: 5,
};
store.update.returns(instance);
const runner = new TaskManagerRunner({
beforeRun: context => Promise.resolve(context),
beforeMarkRunning: context => Promise.resolve(context),
logger,
store,
instance: Object.assign(
{
id: 'foo',
taskType: 'bar',
sequenceNumber: 32,
primaryTerm: 32,
runAt: new Date(),
scheduledAt: new Date(),
startedAt: null,
retryAt: null,
attempts: 0,
params: {},
scope: ['reporting'],
state: {},
status: 'idle',
user: 'example',
ownerId: null,
},
opts.instance || {}
),
instance,
definitions: Object.assign(opts.definitions || {}, {
testbar: {
type: 'bar',
@ -706,6 +908,7 @@ describe('TaskManagerRunner', () => {
createTaskRunner,
},
}),
onTaskEvent: opts.onTaskEvent,
});
return {
@ -714,6 +917,7 @@ describe('TaskManagerRunner', () => {
runner,
logger,
store,
instance,
};
}

View file

@ -12,6 +12,10 @@
import { performance } from 'perf_hooks';
import Joi from 'joi';
import { identity, defaults, flow } from 'lodash';
import { asOk, asErr, mapErr, eitherAsync, unwrap, mapOk, Result } from './lib/result_type';
import { TaskRun, TaskMarkRunning, asTaskRunEvent, asTaskMarkRunningEvent } from './task_events';
import { intervalFromDate, intervalFromNow } from './lib/intervals';
import { Logger } from './types';
import { BeforeRunFunction, BeforeMarkRunningFunction } from './lib/middleware';
@ -20,6 +24,9 @@ import {
CancellableTask,
ConcreteTaskInstance,
RunResult,
SuccessfulRunResult,
FailedRunResult,
FailedTaskResult,
TaskDefinition,
TaskDictionary,
validateRunResult,
@ -27,12 +34,14 @@ import {
} from './task';
const defaultBackoffPerFailure = 5 * 60 * 1000;
const EMPTY_RUN_RESULT: SuccessfulRunResult = {};
export interface TaskRunner {
isExpired: boolean;
cancel: CancelFunction;
markTaskAsRunning: () => Promise<boolean>;
run: () => Promise<RunResult>;
run: () => Promise<Result<SuccessfulRunResult, FailedRunResult>>;
id: string;
toString: () => string;
}
@ -49,6 +58,7 @@ interface Opts {
store: Updatable;
beforeRun: BeforeRunFunction;
beforeMarkRunning: BeforeMarkRunningFunction;
onTaskEvent?: (event: TaskRun | TaskMarkRunning) => void;
}
/**
@ -67,6 +77,7 @@ export class TaskManagerRunner implements TaskRunner {
private bufferedTaskStore: Updatable;
private beforeRun: BeforeRunFunction;
private beforeMarkRunning: BeforeMarkRunningFunction;
private onTaskEvent: (event: TaskRun | TaskMarkRunning) => void;
/**
* Creates an instance of TaskManagerRunner.
@ -78,13 +89,22 @@ export class TaskManagerRunner implements TaskRunner {
* @prop {BeforeRunFunction} beforeRun - A function that adjusts the run context prior to running the task
* @memberof TaskManagerRunner
*/
constructor(opts: Opts) {
this.instance = sanitizeInstance(opts.instance);
this.definitions = opts.definitions;
this.logger = opts.logger;
this.bufferedTaskStore = opts.store;
this.beforeRun = opts.beforeRun;
this.beforeMarkRunning = opts.beforeMarkRunning;
constructor({
instance,
definitions,
logger,
store,
beforeRun,
beforeMarkRunning,
onTaskEvent = identity,
}: Opts) {
this.instance = sanitizeInstance(instance);
this.definitions = definitions;
this.logger = logger;
this.bufferedTaskStore = store;
this.beforeRun = beforeRun;
this.beforeMarkRunning = beforeMarkRunning;
this.onTaskEvent = onTaskEvent;
}
/**
@ -128,9 +148,9 @@ export class TaskManagerRunner implements TaskRunner {
* into the total timeout time the task in configured with. We may decide to
* start the timer after beforeRun resolves
*
* @returns {Promise<RunResult>}
* @returns {Promise<Result<SuccessfulRunResult, FailedRunResult>>}
*/
public async run(): Promise<RunResult> {
public async run(): Promise<Result<SuccessfulRunResult, FailedRunResult>> {
this.logger.debug(`Running task ${this}`);
const modifiedContext = await this.beforeRun({
taskInstance: this.instance,
@ -143,10 +163,9 @@ export class TaskManagerRunner implements TaskRunner {
return this.processResult(validatedResult);
} catch (err) {
this.logger.error(`Task ${this} failed: ${err}`);
// in error scenario, we can not get the RunResult
// re-use modifiedContext's state, which is correct as of beforeRun
return this.processResult({ error: err, state: modifiedContext.taskInstance.state });
return this.processResult(asErr({ error: err, state: modifiedContext.taskInstance.state }));
}
}
@ -183,7 +202,7 @@ export class TaskManagerRunner implements TaskRunner {
this.instance = await this.bufferedTaskStore.update({
...taskInstance,
status: 'running',
status: TaskStatus.Running,
startedAt: now,
attempts,
retryAt: this.instance.interval
@ -209,9 +228,11 @@ export class TaskManagerRunner implements TaskRunner {
}
performanceStopMarkingTaskAsRunning();
this.onTaskEvent(asTaskMarkRunningEvent(this.id, asOk(this.instance)));
return true;
} catch (error) {
performanceStopMarkingTaskAsRunning();
this.onTaskEvent(asTaskMarkRunningEvent(this.id, asErr(error)));
if (error.statusCode !== VERSION_CONFLICT_STATUS) {
throw error;
}
@ -234,61 +255,92 @@ export class TaskManagerRunner implements TaskRunner {
this.logger.warn(`The task ${this} is not cancellable.`);
}
private validateResult(result?: RunResult | void): RunResult {
private validateResult(result?: RunResult | void): Result<SuccessfulRunResult, FailedRunResult> {
const { error } = Joi.validate(result, validateRunResult);
if (error) {
this.logger.warn(`Invalid task result for ${this}: ${error.message}`);
return asErr({
error: new Error(`Invalid task result for ${this}: ${error.message}`),
state: {},
});
}
if (!result) {
return asOk(EMPTY_RUN_RESULT);
}
return result || { state: {} };
return result.error ? asErr({ ...result, error: result.error as Error }) : asOk(result);
}
private async processResultForRecurringTask(result: RunResult): Promise<RunResult> {
// recurring task: update the task instance
const startedAt = this.instance.startedAt!;
const state = result.state || this.instance.state || {};
let status: TaskStatus = this.getInstanceStatus();
private shouldTryToScheduleRetry(): boolean {
if (this.instance.interval) {
return true;
}
let runAt;
if (status === 'failed') {
// task run errored, keep the same runAt
runAt = this.instance.runAt;
} else if (result.runAt) {
runAt = result.runAt;
} else if (result.error) {
// when result.error is truthy, then we're retrying because it failed
const newRunAt = this.instance.interval
? intervalFromDate(startedAt, this.instance.interval)!
: this.getRetryDelay({
attempts: this.instance.attempts,
error: result.error,
});
if (!newRunAt) {
status = 'failed';
runAt = this.instance.runAt;
const maxAttempts = this.definition.maxAttempts || this.bufferedTaskStore.maxAttempts;
return this.instance.attempts < maxAttempts;
}
private rescheduleFailedRun = (
failureResult: FailedRunResult
): Result<SuccessfulRunResult, FailedTaskResult> => {
if (this.shouldTryToScheduleRetry()) {
const { runAt, state, error } = failureResult;
// if we're retrying, keep the number of attempts
const { interval, attempts } = this.instance;
if (runAt || interval) {
return asOk({ state, attempts, runAt });
} else {
runAt = newRunAt;
// when result.error is truthy, then we're retrying because it failed
const newRunAt = this.getRetryDelay({
attempts,
error,
});
if (newRunAt) {
return asOk({ state, attempts, runAt: newRunAt });
}
}
} else {
runAt = intervalFromDate(startedAt, this.instance.interval)!;
}
// scheduling a retry isn't possible,mark task as failed
return asErr({ status: TaskStatus.Failed });
};
await this.bufferedTaskStore.update({
...this.instance,
runAt,
state,
status,
startedAt: null,
retryAt: null,
ownerId: null,
attempts: result.error ? this.instance.attempts : 0,
});
private async processResultForRecurringTask(
result: Result<SuccessfulRunResult, FailedRunResult>
): Promise<void> {
const fieldUpdates = flow(
// if running the task has failed ,try to correct by scheduling a retry in the near future
mapErr(this.rescheduleFailedRun),
// if retrying is possible (new runAt) or this is simply an interval
// based task - reschedule
mapOk(({ runAt, state, attempts = 0 }: Partial<ConcreteTaskInstance>) => {
const { startedAt, interval } = this.instance;
return asOk({
runAt: runAt || intervalFromDate(startedAt!, interval)!,
state,
attempts,
status: TaskStatus.Idle,
});
}),
unwrap
)(result);
return result;
await this.bufferedTaskStore.update(
defaults(
{
...fieldUpdates,
// reset fields that track the lifecycle of the concluded `task run`
startedAt: null,
retryAt: null,
ownerId: null,
},
this.instance
)
);
}
private async processResultWhenDone(result: RunResult): Promise<RunResult> {
private async processResultWhenDone(): Promise<void> {
// not a recurring task: clean up by removing the task instance from store
try {
await this.bufferedTaskStore.remove(this.instance.id);
@ -299,28 +351,29 @@ export class TaskManagerRunner implements TaskRunner {
throw err;
}
}
return result;
}
private async processResult(result: RunResult): Promise<RunResult> {
if (result.runAt || this.instance.interval || result.error) {
await this.processResultForRecurringTask(result);
} else {
await this.processResultWhenDone(result);
}
private async processResult(
result: Result<SuccessfulRunResult, FailedRunResult>
): Promise<Result<SuccessfulRunResult, FailedRunResult>> {
await eitherAsync(
result,
async ({ runAt }: SuccessfulRunResult) => {
if (runAt || this.instance.interval) {
await this.processResultForRecurringTask(result);
} else {
await this.processResultWhenDone();
}
this.onTaskEvent(asTaskRunEvent(this.id, asOk(this.instance)));
},
async ({ error }: FailedRunResult) => {
await this.processResultForRecurringTask(result);
this.onTaskEvent(asTaskRunEvent(this.id, asErr(error)));
}
);
return result;
}
private getInstanceStatus() {
if (this.instance.interval) {
return 'idle';
}
const maxAttempts = this.definition.maxAttempts || this.bufferedTaskStore.maxAttempts;
return this.instance.attempts < maxAttempts ? 'idle' : 'failed';
}
private getRetryDelay({
error,
attempts,
@ -347,7 +400,6 @@ export class TaskManagerRunner implements TaskRunner {
if (addDuration && result) {
result = intervalFromDate(result, addDuration)!;
}
return result;
}
}

View file

@ -7,7 +7,15 @@
import _ from 'lodash';
import sinon from 'sinon';
import uuid from 'uuid';
import { TaskDictionary, TaskDefinition, TaskInstance, TaskStatus } from './task';
import { filter } from 'rxjs/operators';
import {
TaskDictionary,
TaskDefinition,
TaskInstance,
TaskStatus,
TaskLifecycleResult,
} from './task';
import { FetchOpts, StoreOpts, OwnershipClaimingOpts, TaskStore } from './task_store';
// Task manager uses an unconventional directory structure so the linter marks this as a violation, server files should
// be moved under task_manager/server/
@ -15,6 +23,10 @@ import { FetchOpts, StoreOpts, OwnershipClaimingOpts, TaskStore } from './task_s
import { savedObjectsClientMock } from 'src/core/server/mocks';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { SavedObjectsSerializer, SavedObjectsSchema, SavedObjectAttributes } from 'src/core/server';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { SavedObjectsErrorHelpers } from '../../../../src/core/server/saved_objects/service/lib/errors';
import { asTaskClaimEvent, TaskEvent } from './task_events';
import { asOk, asErr } from './lib/result_type';
const taskDefinitions: TaskDictionary<TaskDefinition> = {
report: {
@ -509,6 +521,178 @@ describe('TaskStore', () => {
});
});
test('it supports claiming specific tasks by id', async () => {
const maxAttempts = _.random(2, 43);
const customMaxAttempts = _.random(44, 100);
const {
args: {
updateByQuery: {
body: { query, sort },
},
},
} = await testClaimAvailableTasks({
opts: {
maxAttempts,
definitions: {
foo: {
type: 'foo',
title: '',
createTaskRunner: jest.fn(),
},
bar: {
type: 'bar',
title: '',
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
},
},
claimingOpts: {
claimOwnershipUntil: new Date(),
size: 10,
claimTasksById: [
'33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
],
},
});
expect(query).toMatchObject({
bool: {
must: [
{ term: { type: 'task' } },
{
bool: {
should: [
{
bool: {
must: [
{
bool: {
should: [
{
bool: {
must: [
{ term: { 'task.status': 'idle' } },
{ range: { 'task.runAt': { lte: 'now' } } },
],
},
},
{
bool: {
must: [
{
bool: {
should: [
{ term: { 'task.status': 'running' } },
{ term: { 'task.status': 'claiming' } },
],
},
},
{ range: { 'task.retryAt': { lte: 'now' } } },
],
},
},
],
},
},
{
bool: {
should: [
{ exists: { field: 'task.interval' } },
{
bool: {
must: [
{ term: { 'task.taskType': 'foo' } },
{
range: {
'task.attempts': {
lt: maxAttempts,
},
},
},
],
},
},
{
bool: {
must: [
{ term: { 'task.taskType': 'bar' } },
{
range: {
'task.attempts': {
lt: customMaxAttempts,
},
},
},
],
},
},
],
},
},
],
},
},
{
bool: {
must: [
{
bool: {
should: [
{ term: { 'task.status': 'idle' } },
{ term: { 'task.status': 'failed' } },
],
},
},
{
ids: {
values: [
'task:33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'task:a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
],
},
},
],
},
},
],
},
},
],
},
});
expect(sort).toMatchObject({
_script: {
type: 'number',
order: 'asc',
script: {
lang: 'painless',
source: `
if(params.ids.contains(doc['_id'].value)){
return 0;
}
if (doc['task.retryAt'].size()!=0) {
return doc['task.retryAt'].value.toInstant().toEpochMilli();
}
if (doc['task.runAt'].size()!=0) {
return doc['task.runAt'].value.toInstant().toEpochMilli();
}
`,
params: {
ids: [
'task:33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'task:a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
],
},
},
},
});
});
test('it claims tasks by setting their ownerId, status and retryAt', async () => {
const taskManagerId = uuid.v1();
const claimOwnershipUntil = new Date(Date.now());
@ -740,6 +924,355 @@ describe('TaskStore', () => {
expect(savedObjectsClient.delete).toHaveBeenCalledWith('task', id);
});
});
describe('get', () => {
test('gets the task with the specified id', async () => {
const id = `id-${_.random(1, 20)}`;
const task = {
runAt: mockedDate,
scheduledAt: mockedDate,
startedAt: null,
retryAt: null,
id,
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
attempts: 3,
status: 'idle' as TaskStatus,
version: '123',
ownerId: null,
};
const callCluster = jest.fn();
savedObjectsClient.get.mockImplementation(async (type: string, objectId: string) => ({
id: objectId,
type,
attributes: {
..._.omit(task, 'id'),
..._.mapValues(_.pick(task, 'params', 'state'), value => JSON.stringify(value)),
},
references: [],
version: '123',
}));
const store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
callCluster,
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
const result = await store.get(id);
expect(result).toEqual(task);
expect(savedObjectsClient.get).toHaveBeenCalledWith('task', id);
});
});
describe('getLifecycle', () => {
test('returns the task status if the task exists ', async () => {
expect.assertions(4);
return Promise.all(
Object.values(TaskStatus).map(async status => {
const id = `id-${_.random(1, 20)}`;
const task = {
runAt: mockedDate,
scheduledAt: mockedDate,
startedAt: null,
retryAt: null,
id,
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
attempts: 3,
status: status as TaskStatus,
version: '123',
ownerId: null,
};
const callCluster = jest.fn();
savedObjectsClient.get.mockImplementation(async (type: string, objectId: string) => ({
id: objectId,
type,
attributes: {
..._.omit(task, 'id'),
..._.mapValues(_.pick(task, 'params', 'state'), value => JSON.stringify(value)),
},
references: [],
version: '123',
}));
const store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
callCluster,
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
expect(await store.getLifecycle(id)).toEqual(status);
})
);
});
test('returns NotFound status if the task doesnt exists ', async () => {
const id = `id-${_.random(1, 20)}`;
savedObjectsClient.get.mockRejectedValueOnce(
SavedObjectsErrorHelpers.createGenericNotFoundError('type', 'id')
);
const store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
callCluster: jest.fn(),
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
expect(await store.getLifecycle(id)).toEqual(TaskLifecycleResult.NotFound);
});
test('throws if an unknown error takes place ', async () => {
const id = `id-${_.random(1, 20)}`;
savedObjectsClient.get.mockRejectedValueOnce(
SavedObjectsErrorHelpers.createBadRequestError()
);
const store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
callCluster: jest.fn(),
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
return expect(store.getLifecycle(id)).rejects.toThrow('Bad Request');
});
});
describe('task events', () => {
function generateTasks() {
const taskManagerId = uuid.v1();
const runAt = new Date();
const tasks = [
{
_id: 'aaa',
_source: {
type: 'task',
task: {
runAt,
taskType: 'foo',
interval: undefined,
attempts: 0,
status: 'idle',
params: '{ "hello": "world" }',
state: '{ "baby": "Henhen" }',
user: 'jimbo',
scope: ['reporting'],
ownerId: taskManagerId,
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
},
},
_seq_no: 1,
_primary_term: 2,
sort: ['a', 1],
},
{
_id: 'bbb',
_source: {
type: 'task',
task: {
runAt,
taskType: 'bar',
interval: '5m',
attempts: 2,
status: 'running',
params: '{ "shazm": 1 }',
state: '{ "henry": "The 8th" }',
user: 'dabo',
scope: ['reporting', 'ceo'],
ownerId: taskManagerId,
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
},
},
_seq_no: 3,
_primary_term: 4,
sort: ['b', 2],
},
];
return { taskManagerId, runAt, tasks };
}
test('emits an event when a task is succesfully claimed by id', async done => {
const { taskManagerId, runAt, tasks } = generateTasks();
const callCluster = sinon.spy(async (name: string, params?: any) =>
name === 'updateByQuery'
? {
total: tasks.length,
updated: tasks.length,
}
: { hits: { hits: tasks } }
);
const store = new TaskStore({
callCluster,
maxAttempts: 2,
definitions: taskDefinitions,
serializer,
savedObjectsRepository: savedObjectsClient,
taskManagerId,
index: '',
});
const sub = store.events
.pipe(filter((event: TaskEvent<any, any>) => event.id === 'aaa'))
.subscribe({
next: (event: TaskEvent<any, any>) => {
expect(event).toMatchObject(
asTaskClaimEvent(
'aaa',
asOk({
id: 'aaa',
runAt,
taskType: 'foo',
interval: undefined,
attempts: 0,
status: 'idle' as TaskStatus,
params: { hello: 'world' },
state: { baby: 'Henhen' },
user: 'jimbo',
scope: ['reporting'],
ownerId: taskManagerId,
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
})
)
);
sub.unsubscribe();
done();
},
});
await store.claimAvailableTasks({
claimTasksById: ['aaa'],
claimOwnershipUntil: new Date(),
size: 10,
});
});
test('emits an event when a task is succesfully by scheduling', async done => {
const { taskManagerId, runAt, tasks } = generateTasks();
const callCluster = sinon.spy(async (name: string, params?: any) =>
name === 'updateByQuery'
? {
total: tasks.length,
updated: tasks.length,
}
: { hits: { hits: tasks } }
);
const store = new TaskStore({
callCluster,
maxAttempts: 2,
definitions: taskDefinitions,
serializer,
savedObjectsRepository: savedObjectsClient,
taskManagerId,
index: '',
});
const sub = store.events
.pipe(filter((event: TaskEvent<any, any>) => event.id === 'bbb'))
.subscribe({
next: (event: TaskEvent<any, any>) => {
expect(event).toMatchObject(
asTaskClaimEvent(
'bbb',
asOk({
id: 'bbb',
runAt,
taskType: 'bar',
interval: '5m',
attempts: 2,
status: 'running' as TaskStatus,
params: { shazm: 1 },
state: { henry: 'The 8th' },
user: 'dabo',
scope: ['reporting', 'ceo'],
ownerId: taskManagerId,
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
})
)
);
sub.unsubscribe();
done();
},
});
await store.claimAvailableTasks({
claimTasksById: ['aaa'],
claimOwnershipUntil: new Date(),
size: 10,
});
});
test('emits an event when the store fails to claim a required task by id', async done => {
const { taskManagerId, tasks } = generateTasks();
const callCluster = sinon.spy(async (name: string, params?: any) =>
name === 'updateByQuery'
? {
total: tasks.length,
updated: tasks.length,
}
: { hits: { hits: tasks } }
);
const store = new TaskStore({
callCluster,
maxAttempts: 2,
definitions: taskDefinitions,
serializer,
savedObjectsRepository: savedObjectsClient,
taskManagerId,
index: '',
});
const sub = store.events
.pipe(filter((event: TaskEvent<any, any>) => event.id === 'ccc'))
.subscribe({
next: (event: TaskEvent<any, any>) => {
expect(event).toMatchObject(
asTaskClaimEvent('ccc', asErr(new Error(`failed to claim task 'ccc'`)))
);
sub.unsubscribe();
done();
},
});
await store.claimAvailableTasks({
claimTasksById: ['ccc'],
claimOwnershipUntil: new Date(),
size: 10,
});
});
});
});
function generateFakeTasks(count: number = 1) {

View file

@ -7,8 +7,9 @@
/*
* This module contains helpers for managing the task manager storage layer.
*/
import { Subject, Observable } from 'rxjs';
import { omit, difference } from 'lodash';
import { omit } from 'lodash';
import {
SavedObjectsClientContract,
SavedObject,
@ -19,14 +20,43 @@ import {
// be moved under task_manager/server/
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
} from 'src/core/server';
import { asOk, asErr } from './lib/result_type';
import {
ConcreteTaskInstance,
ElasticJs,
TaskDefinition,
TaskDictionary,
TaskInstance,
TaskLifecycle,
TaskLifecycleResult,
} from './task';
import { TaskClaim, asTaskClaimEvent } from './task_events';
import {
asUpdateByQuery,
shouldBeOneOf,
mustBeAllOf,
ExistsBoolClause,
TermBoolClause,
RangeBoolClause,
BoolClause,
IDsClause,
} from './queries/query_clauses';
import {
updateFields,
IdleTaskWithExpiredRunAt,
RunningOrClaimingTaskWithExpiredRetryAt,
RecuringTaskWithInterval,
taskWithLessThanMaxAttempts,
SortByRunAtAndRetryAt,
taskWithIDsAndRunnableStatus,
sortByIdsThenByScheduling,
} from './queries/mark_available_tasks_as_claimed';
export interface StoreOpts {
callCluster: ElasticJs;
index: string;
@ -60,6 +90,7 @@ export interface UpdateByQueryOpts extends SearchOpts {
export interface OwnershipClaimingOpts {
claimOwnershipUntil: Date;
claimTasksById?: string[];
size: number;
}
@ -92,10 +123,12 @@ export class TaskStore {
public readonly maxAttempts: number;
public readonly index: string;
public readonly taskManagerId: string;
private callCluster: ElasticJs;
private definitions: TaskDictionary<TaskDefinition>;
private savedObjectsRepository: SavedObjectsClientContract;
private serializer: SavedObjectsSerializer;
private events$: Subject<TaskClaim>;
/**
* Constructs a new TaskStore.
@ -115,8 +148,17 @@ export class TaskStore {
this.definitions = opts.definitions;
this.serializer = opts.serializer;
this.savedObjectsRepository = opts.savedObjectsRepository;
this.events$ = new Subject<TaskClaim>();
}
public get events(): Observable<TaskClaim> {
return this.events$;
}
private emitEvent = (event: TaskClaim) => {
this.events$.next(event);
};
/**
* Schedules a task.
*
@ -163,101 +205,85 @@ export class TaskStore {
* @param {OwnershipClaimingOpts} options
* @returns {Promise<ClaimOwnershipResult>}
*/
public async claimAvailableTasks(opts: OwnershipClaimingOpts): Promise<ClaimOwnershipResult> {
const claimedTasks = await this.markAvailableTasksAsClaimed(opts);
const docs = claimedTasks > 0 ? await this.sweepForClaimedTasks(opts) : [];
public claimAvailableTasks = async ({
claimOwnershipUntil,
claimTasksById = [],
size,
}: OwnershipClaimingOpts): Promise<ClaimOwnershipResult> => {
const claimTasksByIdWithRawIds = claimTasksById.map(id =>
this.serializer.generateRawId(undefined, 'task', id)
);
const claimedTasks = await this.markAvailableTasksAsClaimed(
claimOwnershipUntil,
claimTasksByIdWithRawIds,
size
);
const docs =
claimedTasks > 0 ? await this.sweepForClaimedTasks(claimTasksByIdWithRawIds, size) : [];
// emit success/fail events for claimed tasks by id
if (claimTasksById && claimTasksById.length) {
docs.map(doc => asTaskClaimEvent(doc.id, asOk(doc))).forEach(this.emitEvent);
difference(
claimTasksById,
docs.map(doc => doc.id)
)
.map(id => asTaskClaimEvent(id, asErr(new Error(`failed to claim task '${id}'`))))
.forEach(this.emitEvent);
}
return {
claimedTasks,
docs,
};
}
};
private async markAvailableTasksAsClaimed(
claimOwnershipUntil: OwnershipClaimingOpts['claimOwnershipUntil'],
claimTasksById: OwnershipClaimingOpts['claimTasksById'],
size: OwnershipClaimingOpts['size']
): Promise<number> {
const queryForScheduledTasks = mustBeAllOf(
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt),
// Either task has an interval or the attempts < the maximum configured
shouldBeOneOf<ExistsBoolClause | TermBoolClause | RangeBoolClause>(
RecuringTaskWithInterval,
...Object.entries(this.definitions).map(([type, { maxAttempts }]) =>
taskWithLessThanMaxAttempts(type, maxAttempts || this.maxAttempts)
)
)
);
const { query, sort } =
claimTasksById && claimTasksById.length
? {
query: shouldBeOneOf<
| ExistsBoolClause
| TermBoolClause
| RangeBoolClause
| BoolClause<TermBoolClause | IDsClause>
>(queryForScheduledTasks, taskWithIDsAndRunnableStatus(claimTasksById)),
sort: sortByIdsThenByScheduling(claimTasksById),
}
: {
query: queryForScheduledTasks,
sort: SortByRunAtAndRetryAt,
};
private async markAvailableTasksAsClaimed({
size,
claimOwnershipUntil,
}: OwnershipClaimingOpts): Promise<number> {
const { updated } = await this.updateByQuery(
{
query: {
bool: {
must: [
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
{
bool: {
should: [
{
bool: {
must: [
{ term: { 'task.status': 'idle' } },
{ range: { 'task.runAt': { lte: 'now' } } },
],
},
},
{
bool: {
must: [
{
bool: {
should: [
{ term: { 'task.status': 'running' } },
{ term: { 'task.status': 'claiming' } },
],
},
},
{ range: { 'task.retryAt': { lte: 'now' } } },
],
},
},
],
},
},
// Either task has an interval or the attempts < the maximum configured
{
bool: {
should: [
{ exists: { field: 'task.interval' } },
...Object.entries(this.definitions).map(([type, definition]) => ({
bool: {
must: [
{ term: { 'task.taskType': type } },
{
range: {
'task.attempts': {
lt: definition.maxAttempts || this.maxAttempts,
},
},
},
],
},
})),
],
},
},
],
},
},
sort: {
_script: {
type: 'number',
order: 'asc',
script: {
lang: 'expression',
source: `doc['task.retryAt'].value || doc['task.runAt'].value`,
},
},
},
seq_no_primary_term: true,
script: {
source: `ctx._source.task.ownerId=params.ownerId; ctx._source.task.status=params.status; ctx._source.task.retryAt=params.retryAt;`,
lang: 'painless',
params: {
ownerId: this.taskManagerId,
retryAt: claimOwnershipUntil,
status: 'claiming',
},
},
},
asUpdateByQuery({
query,
update: updateFields({
ownerId: this.taskManagerId,
status: 'claiming',
retryAt: claimOwnershipUntil,
}),
sort,
}),
{
max_docs: size,
}
@ -268,9 +294,10 @@ export class TaskStore {
/**
* Fetches tasks from the index, which are owned by the current Kibana instance
*/
private async sweepForClaimedTasks({
size,
}: OwnershipClaimingOpts): Promise<ConcreteTaskInstance[]> {
private async sweepForClaimedTasks(
claimTasksById: OwnershipClaimingOpts['claimTasksById'],
size: OwnershipClaimingOpts['size']
): Promise<ConcreteTaskInstance[]> {
const { docs } = await this.search({
query: {
bool: {
@ -285,16 +312,10 @@ export class TaskStore {
},
},
size,
sort: {
_script: {
type: 'number',
order: 'asc',
script: {
lang: 'expression',
source: `doc['task.retryAt'].value || doc['task.runAt'].value`,
},
},
},
sort:
claimTasksById && claimTasksById.length
? sortByIdsThenByScheduling(claimTasksById)
: SortByRunAtAndRetryAt,
seq_no_primary_term: true,
});
@ -332,6 +353,34 @@ export class TaskStore {
await this.savedObjectsRepository.delete('task', id);
}
/**
* Gets a task by id
*
* @param {string} id
* @returns {Promise<void>}
*/
public async get(id: string): Promise<ConcreteTaskInstance> {
return savedObjectToConcreteTaskInstance(await this.savedObjectsRepository.get('task', id));
}
/**
* Gets task lifecycle step by id
*
* @param {string} id
* @returns {Promise<void>}
*/
public async getLifecycle(id: string): Promise<TaskLifecycle> {
try {
const task = await this.get(id);
return task.status;
} catch (err) {
if (err.output && err.output.statusCode === 404) {
return TaskLifecycleResult.NotFound;
}
throw err;
}
}
private async search(opts: SearchOpts = {}): Promise<FetchResult> {
const { query } = ensureQueryOnlyReturnsTaskObjects(opts);
@ -409,7 +458,7 @@ function taskInstanceToAttributes(doc: TaskInstance): SavedObjectAttributes {
};
}
function savedObjectToConcreteTaskInstance(
export function savedObjectToConcreteTaskInstance(
savedObject: Omit<SavedObject, 'references'>
): ConcreteTaskInstance {
return {

View file

@ -154,6 +154,7 @@
"proxyquire": "1.8.0",
"react-docgen-typescript-loader": "^3.1.1",
"react-test-renderer": "^16.12.0",
"rxjs-marbles": "^5.0.3",
"sass-loader": "^7.3.1",
"sass-resources-loader": "^2.0.1",
"simple-git": "1.129.0",

View file

@ -10,7 +10,7 @@ import { initRoutes } from './init_routes';
const once = function(emitter, event) {
return new Promise(resolve => {
emitter.once(event, resolve);
emitter.once(event, data => resolve(data || {}));
});
};
@ -30,49 +30,74 @@ export default function TaskTestingAPI(kibana) {
init(server) {
const taskManager = server.plugins.task_manager;
const defaultSampleTaskConfig = {
timeout: '1m',
// This task allows tests to specify its behavior (whether it reschedules itself, whether it errors, etc)
// taskInstance.params has the following optional fields:
// nextRunMilliseconds: number - If specified, the run method will return a runAt that is now + nextRunMilliseconds
// failWith: string - If specified, the task will throw an error with the specified message
// failOn: number - If specified, the task will only throw the `failWith` error when `count` equals to the failOn value
// waitForParams : boolean - should the task stall ands wait to receive params asynchronously before using the default params
// waitForEvent : string - if provided, the task will stall (after completing the run) and wait for an asyn event before completing
createTaskRunner: ({ taskInstance }) => ({
async run() {
const { params, state, id } = taskInstance;
const prevState = state || { count: 0 };
const count = (prevState.count || 0) + 1;
const runParams = {
...params,
// if this task requires custom params provided async - wait for them
...(params.waitForParams ? await once(taskTestingEvents, id) : {}),
};
if (runParams.failWith) {
if (!runParams.failOn || (runParams.failOn && count === runParams.failOn)) {
throw new Error(runParams.failWith);
}
}
const callCluster = server.plugins.elasticsearch.getCluster('admin')
.callWithInternalUser;
await callCluster('index', {
index: '.kibana_task_manager_test_result',
body: {
type: 'task',
taskId: taskInstance.id,
params: JSON.stringify(runParams),
state: JSON.stringify(state),
ranAt: new Date(),
},
refresh: true,
});
// Stall task run until a certain event is triggered
if (runParams.waitForEvent) {
await once(taskTestingEvents, runParams.waitForEvent);
}
return {
state: { count },
runAt: millisecondsFromNow(runParams.nextRunMilliseconds),
};
},
}),
};
taskManager.registerTaskDefinitions({
sampleTask: {
...defaultSampleTaskConfig,
title: 'Sample Task',
description: 'A sample task for testing the task_manager.',
timeout: '1m',
// This task allows tests to specify its behavior (whether it reschedules itself, whether it errors, etc)
// taskInstance.params has the following optional fields:
// nextRunMilliseconds: number - If specified, the run method will return a runAt that is now + nextRunMilliseconds
// failWith: string - If specified, the task will throw an error with the specified message
createTaskRunner: ({ taskInstance }) => ({
async run() {
const { params, state } = taskInstance;
const prevState = state || { count: 0 };
if (params.failWith) {
throw new Error(params.failWith);
}
const callCluster = server.plugins.elasticsearch.getCluster('admin')
.callWithInternalUser;
await callCluster('index', {
index: '.kibana_task_manager_test_result',
body: {
type: 'task',
taskId: taskInstance.id,
params: JSON.stringify(params),
state: JSON.stringify(state),
ranAt: new Date(),
},
refresh: true,
});
if (params.waitForEvent) {
await once(taskTestingEvents, params.waitForEvent);
}
return {
state: { count: (prevState.count || 0) + 1 },
runAt: millisecondsFromNow(params.nextRunMilliseconds),
};
},
}),
},
singleAttemptSampleTask: {
...defaultSampleTaskConfig,
title: 'Failing Sample Task',
description:
'A sample task for testing the task_manager that fails on the first attempt to run.',
// fail after the first failed run
maxAttempts: 1,
},
});

View file

@ -59,6 +59,30 @@ export function initRoutes(server, taskTestingEvents) {
},
});
server.route({
path: '/api/sample_tasks/run_now',
method: 'POST',
config: {
validate: {
payload: Joi.object({
task: Joi.object({
id: Joi.string().optional(),
}),
}),
},
},
async handler(request) {
const {
task: { id },
} = request.payload;
try {
return await taskManager.runNow(id);
} catch (err) {
return { id, error: `${err}` };
}
},
});
server.route({
path: '/api/sample_tasks/ensure_scheduled',
method: 'POST',
@ -99,13 +123,16 @@ export function initRoutes(server, taskTestingEvents) {
validate: {
payload: Joi.object({
event: Joi.string().required(),
data: Joi.object()
.optional()
.default({}),
}),
},
},
async handler(request) {
try {
const { event } = request.payload;
taskTestingEvents.emit(event);
const { event, data } = request.payload;
taskTestingEvents.emit(event, data);
return { event };
} catch (err) {
return err;

View file

@ -74,6 +74,15 @@ export default function({ getService }) {
.then(response => response.body);
}
function runTaskNow(task) {
return supertest
.post('/api/sample_tasks/run_now')
.set('kbn-xsrf', 'xxx')
.send({ task })
.expect(200)
.then(response => response.body);
}
function scheduleTaskIfNotExists(task) {
return supertest
.post('/api/sample_tasks/ensure_scheduled')
@ -91,6 +100,24 @@ export default function({ getService }) {
.expect(200);
}
function getTaskById(tasks, id) {
return tasks.filter(task => task.id === id)[0];
}
async function provideParamsToTasksWaitingForParams(taskId, data = {}) {
// wait for task to start running and stall on waitForParams
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(getTaskById(tasks, taskId).status).to.eql('running');
});
return supertest
.post('/api/sample_tasks/event')
.set('kbn-xsrf', 'xxx')
.send({ event: taskId, data })
.expect(200);
}
it('should support middleware', async () => {
const historyItem = _.random(1, 100);
@ -193,7 +220,7 @@ export default function({ getService }) {
expect(task.attempts).to.eql(0);
expect(task.state.count).to.eql(count + 1);
expectReschedule(originalTask, task, nextRunMilliseconds);
expectReschedule(Date.parse(originalTask.runAt), task, nextRunMilliseconds);
});
});
@ -214,12 +241,196 @@ export default function({ getService }) {
expect(task.attempts).to.eql(0);
expect(task.state.count).to.eql(1);
expectReschedule(originalTask, task, intervalMilliseconds);
expectReschedule(Date.parse(originalTask.runAt), task, intervalMilliseconds);
});
});
async function expectReschedule(originalTask, currentTask, expectedDiff) {
const originalRunAt = Date.parse(originalTask.runAt);
it('should return a task run result when asked to run a task now', async () => {
const originalTask = await scheduleTask({
taskType: 'sampleTask',
interval: `30m`,
params: {},
});
await retry.try(async () => {
const docs = await historyDocs();
expect(docs.filter(taskDoc => taskDoc._source.taskId === originalTask.id).length).to.eql(1);
const [task] = (await currentTasks()).docs.filter(
taskDoc => taskDoc.id === originalTask.id
);
expect(task.state.count).to.eql(1);
// ensure this task shouldnt run for another half hour
expectReschedule(Date.parse(originalTask.runAt), task, 30 * 60000);
});
const now = Date.now();
const runNowResult = await runTaskNow({
id: originalTask.id,
});
expect(runNowResult).to.eql({ id: originalTask.id });
await retry.try(async () => {
expect(
(await historyDocs()).filter(taskDoc => taskDoc._source.taskId === originalTask.id).length
).to.eql(2);
const [task] = (await currentTasks()).docs.filter(
taskDoc => taskDoc.id === originalTask.id
);
expect(task.state.count).to.eql(2);
// ensure this task shouldnt run for another half hour
expectReschedule(now, task, 30 * 60000);
});
});
it('should return a task run error result when running a task now fails', async () => {
const originalTask = await scheduleTask({
taskType: 'sampleTask',
interval: `30m`,
params: { failWith: 'error on run now', failOn: 3 },
});
await retry.try(async () => {
const docs = await historyDocs();
expect(docs.filter(taskDoc => taskDoc._source.taskId === originalTask.id).length).to.eql(1);
const [task] = (await currentTasks()).docs.filter(
taskDoc => taskDoc.id === originalTask.id
);
expect(task.state.count).to.eql(1);
// ensure this task shouldnt run for another half hour
expectReschedule(Date.parse(originalTask.runAt), task, 30 * 60000);
});
// second run should still be successful
const successfulRunNowResult = await runTaskNow({
id: originalTask.id,
});
expect(successfulRunNowResult).to.eql({ id: originalTask.id });
await retry.try(async () => {
const [task] = (await currentTasks()).docs.filter(
taskDoc => taskDoc.id === originalTask.id
);
expect(task.state.count).to.eql(2);
});
// third run should fail
const failedRunNowResult = await runTaskNow({
id: originalTask.id,
});
expect(failedRunNowResult).to.eql({ id: originalTask.id, error: `Error: error on run now` });
await retry.try(async () => {
expect(
(await historyDocs()).filter(taskDoc => taskDoc._source.taskId === originalTask.id).length
).to.eql(2);
const [task] = (await currentTasks()).docs.filter(
taskDoc => taskDoc.id === originalTask.id
);
expect(task.attempts).to.eql(1);
});
});
it('should return a task run error result when trying to run a non-existent task', async () => {
// runNow should fail
const failedRunNowResult = await runTaskNow({
id: 'i-dont-exist',
});
expect(failedRunNowResult).to.eql({
error: `Error: Failed to run task "i-dont-exist" as it does not exist`,
id: 'i-dont-exist',
});
});
it('should return a task run error result when trying to run a task now which is already running', async () => {
const longRunningTask = await scheduleTask({
taskType: 'sampleTask',
interval: '30m',
params: {
waitForParams: true,
},
});
// tell the task to wait for the 'runNowHasBeenAttempted' event
await provideParamsToTasksWaitingForParams(longRunningTask.id, {
waitForEvent: 'runNowHasBeenAttempted',
});
await retry.try(async () => {
const docs = await historyDocs();
expect(docs.filter(taskDoc => taskDoc._source.taskId === longRunningTask.id).length).to.eql(
1
);
});
// first runNow should fail
const failedRunNowResult = await runTaskNow({
id: longRunningTask.id,
});
expect(failedRunNowResult).to.eql({
error: `Error: Failed to run task "${longRunningTask.id}" as it is currently running`,
id: longRunningTask.id,
});
// finish first run by emitting 'runNowHasBeenAttempted' event
await releaseTasksWaitingForEventToComplete('runNowHasBeenAttempted');
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(getTaskById(tasks, longRunningTask.id).state.count).to.eql(1);
});
// second runNow should be successful
const successfulRunNowResult = runTaskNow({
id: longRunningTask.id,
});
await provideParamsToTasksWaitingForParams(longRunningTask.id);
expect(await successfulRunNowResult).to.eql({ id: longRunningTask.id });
});
it('should allow a failed task to be rerun using runNow', async () => {
const taskThatFailsBeforeRunNow = await scheduleTask({
taskType: 'singleAttemptSampleTask',
params: {
waitForParams: true,
},
});
// tell the task to fail on its next run
await provideParamsToTasksWaitingForParams(taskThatFailsBeforeRunNow.id, {
failWith: 'error on first run',
});
// wait for task to fail
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(getTaskById(tasks, taskThatFailsBeforeRunNow.id).status).to.eql('failed');
});
// runNow should be successfully run the failing task
const runNowResultWithExpectedFailure = runTaskNow({
id: taskThatFailsBeforeRunNow.id,
});
// release the task without failing this time
await provideParamsToTasksWaitingForParams(taskThatFailsBeforeRunNow.id);
expect(await runNowResultWithExpectedFailure).to.eql({ id: taskThatFailsBeforeRunNow.id });
});
async function expectReschedule(originalRunAt, currentTask, expectedDiff) {
const buffer = 10000;
expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.greaterThan(
expectedDiff - buffer
@ -248,10 +459,6 @@ export default function({ getService }) {
},
});
function getTaskById(tasks, id) {
return tasks.filter(task => task.id === id)[0];
}
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(getTaskById(tasks, fastTask.id).state.count).to.eql(2);

View file

@ -263,6 +263,7 @@ function resetPerfState(target) {
fillPoolStarts: 0,
fillPoolCycles: 0,
fillPoolBail: 0,
claimedOnRerunCycle: 0,
fillPoolBailNoTasks: 0,
},
claimAvailableTasksNoTasks: 0,
@ -326,6 +327,9 @@ function resetPerfState(target) {
case 'fillPool.bailExhaustedCapacity':
performanceState.performance.cycles.fillPoolBail++;
break;
case 'fillPool.claimedOnRerunCycle':
performanceState.performance.cycles.claimedOnRerunCycle++;
break;
case 'fillPool.bailNoTasks':
performanceState.performance.cycles.fillPoolBail++;
performanceState.performance.cycles.fillPoolBailNoTasks++;

View file

@ -17,7 +17,13 @@ export default function({ getService }: { getService: (service: string) => any }
runningAverageTasksPerSecond,
runningAverageLeadTime,
// how often things happen in Task Manager
cycles: { fillPoolStarts, fillPoolCycles, fillPoolBail, fillPoolBailNoTasks },
cycles: {
fillPoolStarts,
fillPoolCycles,
claimedOnRerunCycle,
fillPoolBail,
fillPoolBailNoTasks,
},
claimAvailableTasksNoTasks,
claimAvailableTasksNoAvailableWorkers,
numberOfTasksRanOverall,
@ -70,9 +76,11 @@ export default function({ getService }: { getService: (service: string) => any }
)}]---> next markAsRunning`
);
log.info(`Duration of Perf Test: ${bright(perfTestDuration)}`);
log.info(`Activity within Task Poller: ${bright(activityDuration)}`);
log.info(`Activity within Task Pool: ${bright(activityDuration)}`);
log.info(`Inactivity due to Sleep: ${bright(sleepDuration)}`);
log.info(`Polling Cycles: ${colorizeCycles(fillPoolStarts, fillPoolCycles, fillPoolBail)}`);
log.info(
`Polling Cycles: ${colorizeCycles(fillPoolStarts, fillPoolCycles, claimedOnRerunCycle)}`
);
if (fillPoolBail > 0) {
log.info(` ⮑ Bailed due to:`);
if (fillPoolBailNoTasks > 0) {
@ -127,7 +135,11 @@ function colorize(avg: number) {
return avg < 500 ? green(`${avg}`) : avg < 1000 ? cyan(`${avg}`) : red(`${avg}`);
}
function colorizeCycles(fillPoolStarts: number, fillPoolCycles: number, fillPoolBail: number) {
function colorizeCycles(
fillPoolStarts: number,
fillPoolCycles: number,
claimedOnRerunCycle: number
) {
const perc = (fillPoolCycles * 100) / fillPoolStarts;
const colorFunc = perc >= 100 ? green : perc >= 50 ? cyan : red;
return (
@ -135,7 +147,7 @@ function colorizeCycles(fillPoolStarts: number, fillPoolCycles: number, fillPool
bright(`${fillPoolStarts}`) +
` cycles, of which ` +
colorFunc(`${fillPoolCycles}`) +
` were reran before bailing`
` were reran (of which ${claimedOnRerunCycle} resulted in claiming) before bailing`
);
}

View file

@ -4657,9 +4657,9 @@
"@types/react" "*"
"@types/react@*", "@types/react@^16.8.23", "@types/react@^16.9.11", "@types/react@^16.9.13":
version "16.9.13"
resolved "https://registry.yarnpkg.com/@types/react/-/react-16.9.13.tgz#b3ea5dd443f4a680599e2abba8cc66f5e1ce0059"
integrity sha512-LikzRslbiufJYHyzbHSW0GrAiff8QYLMBFeZmSxzCYGXKxi8m/1PHX+rsVOwhr7mJNq+VIu2Dhf7U6mjFERK6w==
version "16.9.15"
resolved "https://registry.yarnpkg.com/@types/react/-/react-16.9.15.tgz#aeabb7a50f96c9e31a16079ada20ede9ed602977"
integrity sha512-WsmM1b6xQn1tG3X2Hx4F3bZwc2E82pJXt5OPs2YJgg71IzvUoKOSSSYOvLXYCg1ttipM+UuA4Lj3sfvqjVxyZw==
dependencies:
"@types/prop-types" "*"
csstype "^2.2.0"
@ -12956,6 +12956,11 @@ fast-diff@^1.1.2:
resolved "https://registry.yarnpkg.com/fast-diff/-/fast-diff-1.2.0.tgz#73ee11982d86caaf7959828d519cfe927fac5f03"
integrity sha512-xJuoT5+L99XlZ8twedaRf6Ax2TgQVxvgZOYoPKqZufmJib0tL2tegPBOZb1pVNgIhlqDlA0eO0c3wBvQcmzx4w==
fast-equals@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/fast-equals/-/fast-equals-2.0.0.tgz#bef2c423af3939f2c54310df54c57e64cd2adefc"
integrity sha512-u6RBd8cSiLLxAiC04wVsLV6GBFDOXcTCgWkd3wEoFXgidPSoAJENqC9m7Jb2vewSvjBIfXV6icKeh3GTKfIaXA==
fast-glob@^2.0.2:
version "2.0.4"
resolved "https://registry.yarnpkg.com/fast-glob/-/fast-glob-2.0.4.tgz#a4b9f49e36175f5ef1a3456f580226a6e7abcc9e"
@ -25787,6 +25792,13 @@ rx@^4.1.0:
resolved "https://registry.yarnpkg.com/rx/-/rx-4.1.0.tgz#a5f13ff79ef3b740fe30aa803fb09f98805d4782"
integrity sha1-pfE/957zt0D+MKqAP7CfmIBdR4I=
rxjs-marbles@^5.0.3:
version "5.0.3"
resolved "https://registry.yarnpkg.com/rxjs-marbles/-/rxjs-marbles-5.0.3.tgz#d3ca62a4e02d032b1b4ffd558e93336ad78fd100"
integrity sha512-JK6EvLe9uReJxBmUgdKrpMB2JswV+fDcKDg97x20LErLQ7Gi0FG3YEr2Uq9hvgHJjgZXGCvonpzcxARLzKsT4A==
dependencies:
fast-equals "^2.0.0"
rxjs@^5.0.0-beta.11, rxjs@^5.5.2:
version "5.5.12"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-5.5.12.tgz#6fa61b8a77c3d793dbaf270bee2f43f652d741cc"