mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 09:19:04 -04:00
This reverts commit 8b7c513b97
.
This commit is contained in:
parent
8b7c513b97
commit
a239ca2d17
36 changed files with 2 additions and 4017 deletions
|
@ -45,7 +45,8 @@ const buildUiExports = _.once(async () => {
|
|||
* Deletes all indices that start with `.kibana`
|
||||
*/
|
||||
export async function deleteKibanaIndices({ client, stats }) {
|
||||
const indexNames = await fetchKibanaIndices(client);
|
||||
const kibanaIndices = await client.cat.indices({ index: '.kibana*', format: 'json' });
|
||||
const indexNames = kibanaIndices.map(x => x.index);
|
||||
if (!indexNames.length) {
|
||||
return;
|
||||
}
|
||||
|
@ -101,17 +102,3 @@ async function loadElasticVersion() {
|
|||
const packageJson = await readFile(path.join(__dirname, '../../../../package.json'));
|
||||
return JSON.parse(packageJson).version;
|
||||
}
|
||||
|
||||
/**
|
||||
* Migrations mean that the Kibana index will look something like:
|
||||
* .kibana, .kibana_1, .kibana_323, etc. This finds all indices starting
|
||||
* with .kibana, then filters out any that aren't actually Kibana's core
|
||||
* index (e.g. we don't want to remove .kibana_task_manager or the like).
|
||||
*
|
||||
* @param {string} index
|
||||
*/
|
||||
async function fetchKibanaIndices(client) {
|
||||
const kibanaIndices = await client.cat.indices({ index: '.kibana*', format: 'json' });
|
||||
const isKibanaIndex = (index) => (/^\.kibana[_]{0,1}[0-9]*$/).test(index);
|
||||
return kibanaIndices.map(x => x.index).filter(isKibanaIndex);
|
||||
}
|
||||
|
|
|
@ -244,11 +244,9 @@ export default () => Joi.object({
|
|||
}),
|
||||
profile: Joi.boolean().default(false)
|
||||
}).default(),
|
||||
|
||||
status: Joi.object({
|
||||
allowAnonymous: Joi.boolean().default(false)
|
||||
}).default(),
|
||||
|
||||
map: Joi.object({
|
||||
includeElasticMapsService: Joi.boolean().default(true),
|
||||
tilemap: tilemapSchema,
|
||||
|
|
|
@ -29,10 +29,6 @@ export {
|
|||
validations,
|
||||
} from './saved_object';
|
||||
|
||||
export {
|
||||
taskDefinitions
|
||||
} from './task_definitions';
|
||||
|
||||
export {
|
||||
app,
|
||||
apps,
|
||||
|
|
|
@ -1,28 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
import { mergeAtType } from './reduce';
|
||||
import { alias, wrap, uniqueKeys } from './modify_reduce';
|
||||
|
||||
// How plugins define tasks that the task manager can run.
|
||||
export const taskDefinitions = wrap(
|
||||
alias('taskDefinitions'),
|
||||
uniqueKeys(),
|
||||
mergeAtType,
|
||||
);
|
|
@ -27,7 +27,6 @@ import { notifications } from './plugins/notifications';
|
|||
import { kueryAutocomplete } from './plugins/kuery_autocomplete';
|
||||
import { canvas } from './plugins/canvas';
|
||||
import { infra } from './plugins/infra';
|
||||
import { taskManager } from './plugins/task_manager';
|
||||
|
||||
module.exports = function (kibana) {
|
||||
return [
|
||||
|
@ -54,6 +53,5 @@ module.exports = function (kibana) {
|
|||
notifications(kibana),
|
||||
kueryAutocomplete(kibana),
|
||||
infra(kibana),
|
||||
taskManager(kibana),
|
||||
];
|
||||
};
|
||||
|
|
|
@ -1,306 +0,0 @@
|
|||
# Kibana task manager
|
||||
|
||||
The task manager is a generic system for running background tasks. It supports:
|
||||
|
||||
- Single-run and recurring tasks
|
||||
- Scheduling tasks to run after a specified datetime
|
||||
- Basic retry logic
|
||||
- Recovery of stalled tasks / timeouts
|
||||
- Tracking task state across multiple runs
|
||||
- Configuring the run-parameters for specific tasks
|
||||
- Basic coordination to prevent the same task instance from running on more than one Kibana system at a time
|
||||
|
||||
## Implementation details
|
||||
|
||||
At a high-level, the task manager works like this:
|
||||
|
||||
- Every `{poll_interval}` milliseconds, check the `{index}` for any tasks that need to be run:
|
||||
- `runAt` is past
|
||||
- `attempts` is less than the configured threshold
|
||||
- Attempt to claim the task by using optimistic concurrency to set:
|
||||
- status to `running`
|
||||
- `runAt` to now + the timeout specified by the task
|
||||
- Execute the task, if the previous claim succeeded
|
||||
- If the task fails, increment the `attempts` count and reschedule it
|
||||
- If the task succeeds:
|
||||
- If it is recurring, store the result of the run, and reschedule
|
||||
- If it is not recurring, remove it from the index
|
||||
|
||||
## Pooling
|
||||
|
||||
Each task manager instance runs tasks in a pool which ensures that at most N tasks are run at a time, where N is configurable. This prevents the system from running too many tasks at once in resource constrained environments. In addition to this, each individual task can also specify `numWorkers` which tells the system how many workers are consumed by a single running instance of a task. This effectively limits how many tasks of a given type can be run at once.
|
||||
|
||||
For example, we may have a system with a `max_workers` of 10, but a super expensive task (such as `reporting`) which specifies a `numWorkers` of 10. In this case, `reporting` tasks will run one at a time.
|
||||
|
||||
If a task specifies a higher `numWorkers` than the system supports, the system's `max_workers` setting will be substituted for it.
|
||||
|
||||
## Config options
|
||||
|
||||
The task_manager can be configured via `taskManager` config options (e.g. `taskManager.maxAttempts`):
|
||||
|
||||
- `max_attempts` - How many times a failing task instance will be retried before it is never run again
|
||||
- `poll_interval` - How often the background worker should check the task_manager index for more work
|
||||
- `index` - The name of the index that the task_manager
|
||||
- `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10)
|
||||
- `credentials` - Encrypted user credentials. All tasks will run in the security context of this user. See [this issue](https://github.com/elastic/dev/issues/1045) for a discussion on task scheduler security.
|
||||
- `override_num_workers`: An object of `taskType: number` that overrides the `num_workers` for tasks
|
||||
- For example: `task_manager.override_num_workers.reporting: 2` would override the number of workers occupied by tasks of type `reporting`
|
||||
- This allows sysadmins to tweak the operational performance of Kibana, allowing more or fewer tasks of a specific type to run simultaneously
|
||||
|
||||
## Task definitions
|
||||
|
||||
Plugins define tasks by calling the `registerTaskDefinitions` method on the `server.taskManager` object.
|
||||
|
||||
A sample task can be found in the [x-pack/test/plugin_api_integration/plugins/task_manager](../../test/plugin_api_integration/plugins/task_manager/index.js) folder.
|
||||
|
||||
```js
|
||||
const { taskManager } = server;
|
||||
taskManager.registerTaskDefinitions({
|
||||
// clusterMonitoring is the task type, and must be unique across the entire system
|
||||
clusterMonitoring: {
|
||||
// Human friendly name, used to represent this task in logs, UI, etc
|
||||
title: 'Human friendly name',
|
||||
|
||||
// Optional, human-friendly, more detailed description
|
||||
description: 'Amazing!!',
|
||||
|
||||
// Optional, how long, in minutes, the system should wait before
|
||||
// a running instance of this task is considered to be timed out.
|
||||
// This defaults to 5 minutes.
|
||||
timeOut: '5m',
|
||||
|
||||
// The clusterMonitoring task occupies 2 workers, so if the system has 10 worker slots,
|
||||
// 5 clusterMonitoring tasks could run concurrently per Kibana instance. This value is
|
||||
// overridden by the `override_num_workers` config value, if specified.
|
||||
numWorkers: 2,
|
||||
|
||||
// The createTaskRunner function / method returns an object that is responsible for
|
||||
// performing the work of the task. context: { taskInstance, kbnServer }, is documented below.
|
||||
createTaskRunner(context) {
|
||||
return {
|
||||
// Perform the work of the task. The return value should fit the TaskResult interface, documented
|
||||
// below. Invalid return values will result in a logged warning.
|
||||
async run() {
|
||||
// Do some work
|
||||
// Conditionally send some alerts
|
||||
// Return some result or other...
|
||||
},
|
||||
|
||||
// Optional, will be called if a running instance of this task times out, allowing the task
|
||||
// to attempt to clean itself up.
|
||||
async cancel() {
|
||||
// Do whatever is required to cancel this task, such as killing any spawned processes
|
||||
},
|
||||
};
|
||||
}
|
||||
},
|
||||
});
|
||||
```
|
||||
|
||||
When Kibana attempts to claim and run a task instance, it looks its definition up, and executes its createTaskRunner's method, passing it a run context which looks like this:
|
||||
|
||||
```js
|
||||
{
|
||||
// An instance of the Kibana server object.
|
||||
kbnServer,
|
||||
|
||||
// The data associated with this instance of the task, with two properties being most notable:
|
||||
//
|
||||
// params:
|
||||
// An object, specific to this task instance, used by the
|
||||
// task to determine exactly what work should be performed.
|
||||
// e.g. a cluster-monitoring task might have a `clusterName`
|
||||
// property in here, but a movie-monitoring task might have
|
||||
// a `directorName` property.
|
||||
//
|
||||
// state:
|
||||
// The state returned from the previous run of this task instance.
|
||||
// If this task instance has never succesfully run, this will
|
||||
// be an empty object: {}
|
||||
taskInstance,
|
||||
}
|
||||
```
|
||||
|
||||
## Task result
|
||||
|
||||
The task runner's `run` method is expected to return a promise that resolves to undefined or to an object that looks like the following:
|
||||
```js
|
||||
{
|
||||
// Optional, if specified, this is used as the tasks' nextRun, overriding
|
||||
// the default system scheduler.
|
||||
runAt: "2020-07-24T17:34:35.272Z",
|
||||
|
||||
// Optional, an error object, logged out as a warning. The pressence of this
|
||||
// property indicates that the task did not succeed.
|
||||
error: { message: 'Hrumph!' },
|
||||
|
||||
// Optional, this will be passed into the next run of the task, if
|
||||
// this is a recurring task.
|
||||
state: {
|
||||
anything: 'goes here',
|
||||
},
|
||||
}
|
||||
```
|
||||
|
||||
Other return values will result in a warning, but the system should continue to work.
|
||||
|
||||
## Task instances
|
||||
|
||||
The task_manager module will store scheduled task instances in an index. This allows for recovery of failed tasks, coordination across Kibana clusters, persistence across Kibana reboots, etc.
|
||||
|
||||
The data stored for a task instance looks something like this:
|
||||
|
||||
```js
|
||||
{
|
||||
// The type of task that will run this instance.
|
||||
taskType: 'clusterMonitoring',
|
||||
|
||||
// The next time this task instance should run. It is not guaranteed
|
||||
// to run at this time, but it is guaranteed not to run earlier than
|
||||
// this.
|
||||
runAt: "2020-07-24T17:34:35.272Z",
|
||||
|
||||
// Indicates that this is a recurring task. We currently only support
|
||||
// 1 minute granularity.
|
||||
interval: '5m',
|
||||
|
||||
// How many times this task has been unsuccesfully attempted,
|
||||
// this will be reset to 0 if the task ever succesfully completes.
|
||||
// This is incremented if a task fails or times out.
|
||||
attempts: 0,
|
||||
|
||||
// Currently, this is either idle | running. It is used to
|
||||
// coordinate which Kibana instance owns / is running a specific
|
||||
// task instance.
|
||||
status: 'idle',
|
||||
|
||||
// The params specific to this task instance, which will be
|
||||
// passed to the task when it runs, and will be used by the
|
||||
// task to determine exactly what work should be performed.
|
||||
// This is a JSON blob, and will be different per task type.
|
||||
// e.g. a cluster-monitoring task might have a `clusterName`
|
||||
// property in here, but a movie-monitoring task might have
|
||||
// a `directorName` property.
|
||||
params: '{ "task": "specific stuff here" }',
|
||||
|
||||
// The result of the previous run of this task instance. This
|
||||
// will be passed to the next run of the task, along with the
|
||||
// params, and could be used by a task to do special logic If
|
||||
// the task state changes (e.g. from green to red, or foo to bar)
|
||||
// If there was no previous run (e.g. the instance has never succesfully
|
||||
// completed, this will be an empty object.). This is a JSON blob,
|
||||
// and will be different per task type.
|
||||
state: '{ "status": "green" }',
|
||||
|
||||
// An extension point for 3rd parties to build in security features on
|
||||
// top of the task manager. For example, this might be the token of the user
|
||||
// who scheduled this task.
|
||||
userContext: 'the token of the user who scheduled this task',
|
||||
|
||||
// An extension point for 3rd parties to build in security features on
|
||||
// top of the task manager, and is expected to be the id of the user, if any,
|
||||
// that scheduled this task.
|
||||
user: '23lk3l42',
|
||||
|
||||
// An application-specific designation, allowing different Kibana
|
||||
// plugins / apps to query for only those tasks they care about.
|
||||
scope: 'alerting',
|
||||
}
|
||||
```
|
||||
|
||||
## Programmatic access
|
||||
|
||||
The task manager mixin exposes a taskManager object on the Kibana server which plugins can use to manage scheduled tasks. Each method takes an optional `scope` argument and ensures that only tasks with the specified scope(s) will be affected.
|
||||
|
||||
```js
|
||||
const { taskManager } = server;
|
||||
// Schedules a task. All properties are as documented in the previous
|
||||
// storage section, except that here, params is an object, not a JSON
|
||||
// string.
|
||||
const task = await taskManager.schedule({
|
||||
taskType,
|
||||
runAt,
|
||||
interval,
|
||||
params,
|
||||
scope: 'my-fanci-app',
|
||||
});
|
||||
|
||||
// Removes the specified task
|
||||
await manager.remove({ id: task.id });
|
||||
|
||||
// Fetches tasks, supports pagination, via the search-after API:
|
||||
// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-search-after.html
|
||||
// If scope is not specified, all tasks are returned, otherwise only tasks
|
||||
// with the given scope are returned.
|
||||
const results = await manager.find({ scope: 'my-fanci-app', searchAfter: ['ids'] });
|
||||
|
||||
// results look something like this:
|
||||
{
|
||||
searchAfter: ['233322'],
|
||||
// Tasks is an array of task instances
|
||||
tasks: [{
|
||||
id: '3242342',
|
||||
taskType: 'reporting',
|
||||
// etc
|
||||
}]
|
||||
}
|
||||
```
|
||||
|
||||
More custom access to the tasks can be done directly via Elasticsearch, though that won't be officially supported, as we can change the document structure at any time.
|
||||
|
||||
## Middleware
|
||||
|
||||
The task manager exposes a middleware layer that allows modifying tasks before they are scheduled / persisted to the task manager index, and modifying tasks / the run context before a task is run.
|
||||
|
||||
For example:
|
||||
|
||||
```js
|
||||
// In your plugin's init
|
||||
server.taskManager.addMiddleware({
|
||||
async beforeSave({ taskInstance, ...opts }) {
|
||||
console.log(`About to save a task of type ${taskInstance.taskType}`);
|
||||
|
||||
return {
|
||||
...opts,
|
||||
taskInstance: {
|
||||
...taskInstance,
|
||||
params: {
|
||||
...taskInstance.params,
|
||||
example: 'Added to params!',
|
||||
},
|
||||
},
|
||||
};
|
||||
},
|
||||
|
||||
async beforeRun({ taskInstance, ...opts }) {
|
||||
console.log(`About to run ${taskInstance.taskType} ${taskInstance.id}`);
|
||||
const { example, ...taskWithoutExampleProp } = taskInstance;
|
||||
|
||||
return {
|
||||
...opts,
|
||||
taskInstance: taskWithoutExampleProp,
|
||||
};
|
||||
},
|
||||
});
|
||||
```
|
||||
|
||||
## Limitations in v1.0
|
||||
|
||||
In v1, the system only understands 1 minute increments (e.g. '1m', '7m'). Tasks which need something more robust will need to specify their own "runAt" in their run method's return value.
|
||||
|
||||
There is only a rudimentary mechanism for coordinating tasks and handling expired tasks. Tasks are considered expired if their runAt has arrived, and their status is still 'running'.
|
||||
|
||||
There is no task history. Each run overwrites the previous run's state. One-time tasks are removed from the index upon completion regardless of success / failure.
|
||||
|
||||
The task manager's public API is create / delete / list. Updates aren't directly supported, and listing should be scoped so that users only see their own tasks.
|
||||
|
||||
## Testing
|
||||
|
||||
- `node scripts/jest --testPathPattern=task_manager --watch`
|
||||
|
||||
Integration tests can be run like so:
|
||||
|
||||
```
|
||||
node scripts/functional_tests_server.js --config test/plugin_functional/config.js
|
||||
node scripts/functional_test_runner --config test/plugin_functional/config.js --grep task_manager
|
||||
```
|
|
@ -1,41 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { TaskManager } from './task_manager';
|
||||
|
||||
export function taskManager(kibana) {
|
||||
return new kibana.Plugin({
|
||||
id: 'task_manager',
|
||||
require: ['kibana', 'elasticsearch', 'xpack_main'],
|
||||
configPrefix: 'xpack.task_manager',
|
||||
config(Joi) {
|
||||
return Joi.object({
|
||||
enabled: Joi.boolean().default(true),
|
||||
max_attempts: Joi.number()
|
||||
.description('The maximum number of times a task will be attempted before being abandoned as failed')
|
||||
.default(3),
|
||||
poll_interval: Joi.number()
|
||||
.description('How often, in milliseconds, the task manager will look for more work.')
|
||||
.default(3000),
|
||||
index: Joi.string()
|
||||
.description('The name of the index used to store task information.')
|
||||
.default('.kibana_task_manager'),
|
||||
max_workers: Joi.number()
|
||||
.description('The maximum number of tasks that this Kibana instance will run simultaneously.')
|
||||
.default(10),
|
||||
override_num_workers: Joi.object()
|
||||
.pattern(/.*/, Joi.number().greater(0))
|
||||
.description('Customize the number of workers occupied by specific tasks (e.g. override_num_workers.reporting: 2)')
|
||||
.default({})
|
||||
}).default();
|
||||
},
|
||||
preInit(server) {
|
||||
const config = server.config();
|
||||
const taskManager = new TaskManager(this.kbnServer, server, config);
|
||||
server.decorate('server', 'taskManager', taskManager);
|
||||
},
|
||||
});
|
||||
}
|
|
@ -1,47 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import _ from 'lodash';
|
||||
import sinon from 'sinon';
|
||||
import { fillPool } from './fill_pool';
|
||||
|
||||
describe('fillPool', () => {
|
||||
test('stops filling when there are no more tasks in the store', async () => {
|
||||
const tasks = [[1, 2, 3], [4, 5]];
|
||||
let index = 0;
|
||||
const fetchAvailableTasks = async () => tasks[index++] || [];
|
||||
const run = sinon.spy(() => true);
|
||||
const converter = _.identity;
|
||||
|
||||
await fillPool(run, fetchAvailableTasks, converter);
|
||||
|
||||
expect(_.flattenDeep(run.args)).toEqual([1, 2, 3, 4, 5]);
|
||||
});
|
||||
|
||||
test('stops filling when the pool has no more capacity', async () => {
|
||||
const tasks = [[1, 2, 3], [4, 5]];
|
||||
let index = 0;
|
||||
const fetchAvailableTasks = async () => tasks[index++] || [];
|
||||
const run = sinon.spy(() => false);
|
||||
const converter = _.identity;
|
||||
|
||||
await fillPool(run, fetchAvailableTasks, converter);
|
||||
|
||||
expect(_.flattenDeep(run.args)).toEqual([1, 2, 3]);
|
||||
});
|
||||
|
||||
test('calls the converter on the records prior to running', async () => {
|
||||
const tasks = [[1, 2, 3], [4, 5]];
|
||||
let index = 0;
|
||||
const fetchAvailableTasks = async () => tasks[index++] || [];
|
||||
const run = sinon.spy(() => false);
|
||||
const converter = (x: number) => x.toString();
|
||||
|
||||
await fillPool(run, fetchAvailableTasks, converter);
|
||||
|
||||
expect(_.flattenDeep(run.args)).toEqual(['1', '2', '3']);
|
||||
});
|
||||
});
|
|
@ -1,41 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
type BatchRun<T> = (tasks: T[]) => Promise<boolean>;
|
||||
type Fetcher<T> = () => Promise<T[]>;
|
||||
type Converter<T1, T2> = (t: T1) => T2;
|
||||
|
||||
/**
|
||||
* Given a function that runs a batch of tasks (e.g. taskPool.run), a function
|
||||
* that fetches task records (e.g. store.fetchAvailableTasks), and a function
|
||||
* that converts task records to the appropriate task runner, this function
|
||||
* fills the pool with work.
|
||||
*
|
||||
* This is annoyingly general in order to simplify testing.
|
||||
*
|
||||
* @param run - a function that runs a batch of tasks (e.g. taskPool.run)
|
||||
* @param fetchAvailableTasks - a function that fetches task records (e.g. store.fetchAvailableTasks)
|
||||
* @param converter - a function that converts task records to the appropriate task runner
|
||||
*/
|
||||
export async function fillPool<TRecord, TRunner>(
|
||||
run: BatchRun<TRunner>,
|
||||
fetchAvailableTasks: Fetcher<TRecord>,
|
||||
converter: Converter<TRecord, TRunner>
|
||||
) {
|
||||
while (true) {
|
||||
const instances = await fetchAvailableTasks();
|
||||
|
||||
if (!instances.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
const tasks = instances.map(converter);
|
||||
|
||||
if (!(await run(tasks))) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,52 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import _ from 'lodash';
|
||||
import { assertValidInterval, intervalFromNow, minutesFromNow } from './intervals';
|
||||
|
||||
describe('taskIntervals', () => {
|
||||
describe('assertValidInterval', () => {
|
||||
test('it accepts intervals in the form `Nm`', () => {
|
||||
expect(() => assertValidInterval(`${_.random(1000)}m`)).not.toThrow();
|
||||
});
|
||||
|
||||
test('it rejects intervals are not of the form `Nm`', () => {
|
||||
expect(() => assertValidInterval(`5m 2s`)).toThrow(
|
||||
/Invalid interval "5m 2s"\. Intervals must be of the form {numbrer}m. Example: 5m/
|
||||
);
|
||||
expect(() => assertValidInterval(`hello`)).toThrow(
|
||||
/Invalid interval "hello"\. Intervals must be of the form {numbrer}m. Example: 5m/
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('intervalFromNow', () => {
|
||||
test('it returns the current date plus n minutes', () => {
|
||||
const mins = _.random(1, 100);
|
||||
const expected = Date.now() + mins * 60 * 1000;
|
||||
const nextRun = intervalFromNow(`${mins}m`)!.getTime();
|
||||
expect(Math.abs(nextRun - expected)).toBeLessThan(100);
|
||||
});
|
||||
|
||||
test('it rejects intervals are not of the form `Nm`', () => {
|
||||
expect(() => intervalFromNow(`5m 2s`)).toThrow(
|
||||
/Invalid interval "5m 2s"\. Intervals must be of the form {numbrer}m. Example: 5m/
|
||||
);
|
||||
expect(() => intervalFromNow(`hello`)).toThrow(
|
||||
/Invalid interval "hello"\. Intervals must be of the form {numbrer}m. Example: 5m/
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('minutesFromNow', () => {
|
||||
test('it returns the current date plus a number of minutes', () => {
|
||||
const mins = _.random(1, 100);
|
||||
const expected = Date.now() + mins * 60 * 1000;
|
||||
const nextRun = minutesFromNow(mins).getTime();
|
||||
expect(Math.abs(nextRun - expected)).toBeLessThan(100);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -1,53 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Returns a date that is the specified interval from now. Currently,
|
||||
* only minute-intervals are supported.
|
||||
*
|
||||
* @param {string} interval - An interval of the form `Nm` such as `5m`
|
||||
*/
|
||||
export function intervalFromNow(interval?: string): Date | undefined {
|
||||
if (interval === undefined) {
|
||||
return;
|
||||
}
|
||||
|
||||
assertValidInterval(interval);
|
||||
|
||||
return minutesFromNow(parseInterval(interval));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a date that is mins minutes from now.
|
||||
*
|
||||
* @param mins The number of mintues from now
|
||||
*/
|
||||
export function minutesFromNow(mins: number): Date {
|
||||
const now = new Date();
|
||||
|
||||
now.setMinutes(now.getMinutes() + mins);
|
||||
|
||||
return now;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that the specified interval matches our expected format.
|
||||
*
|
||||
* @param {string} interval - An interval such as `5m`
|
||||
*/
|
||||
export function assertValidInterval(interval: string) {
|
||||
if (/^[0-9]+m$/.test(interval)) {
|
||||
return interval;
|
||||
}
|
||||
|
||||
throw new Error(
|
||||
`Invalid interval "${interval}". Intervals must be of the form {numbrer}m. Example: 5m.`
|
||||
);
|
||||
}
|
||||
|
||||
function parseInterval(interval: string) {
|
||||
return parseInt(interval, 10);
|
||||
}
|
|
@ -1,44 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
export type LogFn = (prefix: string[], msg: string) => void;
|
||||
|
||||
type SimpleLogFn = (msg: string) => void;
|
||||
|
||||
export interface Logger {
|
||||
error: SimpleLogFn;
|
||||
warning: SimpleLogFn;
|
||||
debug: SimpleLogFn;
|
||||
info: SimpleLogFn;
|
||||
}
|
||||
|
||||
export class TaskManagerLogger implements Logger {
|
||||
private write: LogFn;
|
||||
|
||||
constructor(log: LogFn) {
|
||||
this.write = log;
|
||||
}
|
||||
|
||||
public error(msg: string) {
|
||||
this.log('error', msg);
|
||||
}
|
||||
|
||||
public warning(msg: string) {
|
||||
this.log('warning', msg);
|
||||
}
|
||||
|
||||
public debug(msg: string) {
|
||||
this.log('debug', msg);
|
||||
}
|
||||
|
||||
public info(msg: string) {
|
||||
this.log('info', msg);
|
||||
}
|
||||
|
||||
private log(type: string, msg: string) {
|
||||
this.write([type, 'task_manager'], msg);
|
||||
}
|
||||
}
|
|
@ -1,157 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import moment from 'moment';
|
||||
import { ConcreteTaskInstance, RunContext, TaskInstance, TaskStatus } from '../task';
|
||||
import { addMiddlewareToChain } from './middleware';
|
||||
|
||||
interface BeforeSaveOpts {
|
||||
taskInstance: TaskInstance;
|
||||
}
|
||||
|
||||
const getMockTaskInstance = () => ({
|
||||
taskType: 'nice_task',
|
||||
params: { abc: 'def' },
|
||||
});
|
||||
const getMockConcreteTaskInstance = () => {
|
||||
const concrete: {
|
||||
id: string;
|
||||
version: number;
|
||||
attempts: number;
|
||||
status: TaskStatus;
|
||||
runAt: Date;
|
||||
state: any;
|
||||
taskType: string;
|
||||
params: any;
|
||||
} = {
|
||||
id: 'hy8o99o83',
|
||||
version: 1,
|
||||
attempts: 0,
|
||||
status: 'idle',
|
||||
runAt: new Date(moment('2018-09-18T05:33:09.588Z').valueOf()),
|
||||
state: {},
|
||||
taskType: 'nice_task',
|
||||
params: { abc: 'def' },
|
||||
};
|
||||
return concrete;
|
||||
};
|
||||
const getMockRunContext = (runTask: ConcreteTaskInstance) => ({
|
||||
taskInstance: runTask,
|
||||
kbnServer: {},
|
||||
});
|
||||
|
||||
const defaultBeforeSave = async (opts: BeforeSaveOpts) => {
|
||||
return opts;
|
||||
};
|
||||
|
||||
const defaultBeforeRun = async (opts: RunContext) => {
|
||||
return opts;
|
||||
};
|
||||
|
||||
describe('addMiddlewareToChain', () => {
|
||||
it('chains the beforeSave functions', () => {
|
||||
const m1 = {
|
||||
beforeSave: async (opts: BeforeSaveOpts) => {
|
||||
Object.assign(opts.taskInstance.params, { m1: true });
|
||||
return opts;
|
||||
},
|
||||
beforeRun: defaultBeforeRun,
|
||||
};
|
||||
const m2 = {
|
||||
beforeSave: async (opts: BeforeSaveOpts) => {
|
||||
Object.assign(opts.taskInstance.params, { m2: true });
|
||||
return opts;
|
||||
},
|
||||
beforeRun: defaultBeforeRun,
|
||||
};
|
||||
const m3 = {
|
||||
beforeSave: async (opts: BeforeSaveOpts) => {
|
||||
Object.assign(opts.taskInstance.params, { m3: true });
|
||||
return opts;
|
||||
},
|
||||
beforeRun: defaultBeforeRun,
|
||||
};
|
||||
|
||||
let middlewareChain;
|
||||
middlewareChain = addMiddlewareToChain(m1, m2);
|
||||
middlewareChain = addMiddlewareToChain(middlewareChain, m3);
|
||||
|
||||
middlewareChain.beforeSave({ taskInstance: getMockTaskInstance() }).then(saveOpts => {
|
||||
expect(saveOpts).toMatchInlineSnapshot(`
|
||||
Object {
|
||||
"taskInstance": Object {
|
||||
"params": Object {
|
||||
"abc": "def",
|
||||
"m1": true,
|
||||
"m2": true,
|
||||
"m3": true,
|
||||
},
|
||||
"taskType": "nice_task",
|
||||
},
|
||||
}
|
||||
`);
|
||||
});
|
||||
});
|
||||
|
||||
it('chains the beforeRun functions', () => {
|
||||
const m1 = {
|
||||
beforeSave: defaultBeforeSave,
|
||||
beforeRun: async (opts: RunContext) => {
|
||||
return {
|
||||
...opts,
|
||||
m1: true,
|
||||
};
|
||||
},
|
||||
};
|
||||
const m2 = {
|
||||
beforeSave: defaultBeforeSave,
|
||||
beforeRun: async (opts: RunContext) => {
|
||||
return {
|
||||
...opts,
|
||||
m2: true,
|
||||
};
|
||||
},
|
||||
};
|
||||
const m3 = {
|
||||
beforeSave: defaultBeforeSave,
|
||||
beforeRun: async (opts: RunContext) => {
|
||||
return {
|
||||
...opts,
|
||||
m3: true,
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
let middlewareChain;
|
||||
middlewareChain = addMiddlewareToChain(m1, m2);
|
||||
middlewareChain = addMiddlewareToChain(middlewareChain, m3);
|
||||
|
||||
middlewareChain
|
||||
.beforeRun(getMockRunContext(getMockConcreteTaskInstance()))
|
||||
.then(contextOpts => {
|
||||
expect(contextOpts).toMatchInlineSnapshot(`
|
||||
Object {
|
||||
"kbnServer": Object {},
|
||||
"m1": true,
|
||||
"m2": true,
|
||||
"m3": true,
|
||||
"taskInstance": Object {
|
||||
"attempts": 0,
|
||||
"id": "hy8o99o83",
|
||||
"params": Object {
|
||||
"abc": "def",
|
||||
},
|
||||
"runAt": 2018-09-18T05:33:09.588Z,
|
||||
"state": Object {},
|
||||
"status": "idle",
|
||||
"taskType": "nice_task",
|
||||
"version": 1,
|
||||
},
|
||||
}
|
||||
`);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -1,46 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { RunContext, TaskInstance } from '../task';
|
||||
|
||||
/*
|
||||
* BeforeSaveMiddlewareParams is nearly identical to RunContext, but
|
||||
* taskInstance is before save (no _id property)
|
||||
*
|
||||
* taskInstance property is guaranteed to exist. The params can optionally
|
||||
* include fields from an "options" object passed as the 2nd parameter to
|
||||
* taskManager.schedule()
|
||||
*/
|
||||
export interface BeforeSaveMiddlewareParams {
|
||||
taskInstance: TaskInstance;
|
||||
}
|
||||
|
||||
export type BeforeSaveFunction = (
|
||||
params: BeforeSaveMiddlewareParams
|
||||
) => Promise<BeforeSaveMiddlewareParams>;
|
||||
|
||||
export type BeforeRunFunction = (params: RunContext) => Promise<RunContext>;
|
||||
|
||||
export interface Middleware {
|
||||
beforeSave: BeforeSaveFunction;
|
||||
beforeRun: BeforeRunFunction;
|
||||
}
|
||||
|
||||
export function addMiddlewareToChain(prevMiddleware: Middleware, middleware: Middleware) {
|
||||
const beforeSave = middleware.beforeSave
|
||||
? (params: BeforeSaveMiddlewareParams) =>
|
||||
middleware.beforeSave(params).then(prevMiddleware.beforeSave)
|
||||
: prevMiddleware.beforeSave;
|
||||
|
||||
const beforeRun = middleware.beforeRun
|
||||
? (params: RunContext) => middleware.beforeRun(params).then(prevMiddleware.beforeRun)
|
||||
: prevMiddleware.beforeRun;
|
||||
|
||||
return {
|
||||
beforeSave,
|
||||
beforeRun,
|
||||
};
|
||||
}
|
|
@ -1,172 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { get } from 'lodash';
|
||||
import { RunContext } from '../task';
|
||||
import { sanitizeTaskDefinitions } from './sanitize_task_definitions';
|
||||
|
||||
interface Opts {
|
||||
numTasks: number;
|
||||
numWorkers?: number;
|
||||
}
|
||||
|
||||
const getMockTaskDefinitions = (opts: Opts) => {
|
||||
const { numTasks, numWorkers } = opts;
|
||||
const tasks: any = {};
|
||||
|
||||
for (let i = 0; i < numTasks; i++) {
|
||||
const type = `test_task_type_${i}`;
|
||||
tasks[type] = {
|
||||
type,
|
||||
title: 'Test',
|
||||
description: 'one super cool task',
|
||||
numWorkers: numWorkers ? numWorkers : 1,
|
||||
createTaskRunner(context: RunContext) {
|
||||
const incre = get(context, 'taskInstance.state.incre', -1);
|
||||
return {
|
||||
run: () => ({
|
||||
state: {
|
||||
incre: incre + 1,
|
||||
},
|
||||
runAt: Date.now(),
|
||||
}),
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
return tasks;
|
||||
};
|
||||
|
||||
describe('sanitizeTaskDefinitions', () => {
|
||||
it('provides tasks with defaults if there are no overrides', () => {
|
||||
const maxWorkers = 10;
|
||||
const overrideNumWorkers = {};
|
||||
const taskDefinitions = getMockTaskDefinitions({ numTasks: 3 });
|
||||
const result = sanitizeTaskDefinitions(taskDefinitions, maxWorkers, overrideNumWorkers);
|
||||
|
||||
expect(result).toMatchInlineSnapshot(`
|
||||
Object {
|
||||
"test_task_type_0": Object {
|
||||
"createTaskRunner": [Function],
|
||||
"description": "one super cool task",
|
||||
"numWorkers": 1,
|
||||
"timeOut": "5m",
|
||||
"title": "Test",
|
||||
"type": "test_task_type_0",
|
||||
},
|
||||
"test_task_type_1": Object {
|
||||
"createTaskRunner": [Function],
|
||||
"description": "one super cool task",
|
||||
"numWorkers": 1,
|
||||
"timeOut": "5m",
|
||||
"title": "Test",
|
||||
"type": "test_task_type_1",
|
||||
},
|
||||
"test_task_type_2": Object {
|
||||
"createTaskRunner": [Function],
|
||||
"description": "one super cool task",
|
||||
"numWorkers": 1,
|
||||
"timeOut": "5m",
|
||||
"title": "Test",
|
||||
"type": "test_task_type_2",
|
||||
},
|
||||
}
|
||||
`);
|
||||
});
|
||||
|
||||
it('scales down task definitions workers if larger than max workers', () => {
|
||||
const maxWorkers = 2;
|
||||
const overrideNumWorkers = {};
|
||||
const taskDefinitions = getMockTaskDefinitions({ numTasks: 2, numWorkers: 5 });
|
||||
const result = sanitizeTaskDefinitions(taskDefinitions, maxWorkers, overrideNumWorkers);
|
||||
|
||||
expect(result).toMatchInlineSnapshot(`
|
||||
Object {
|
||||
"test_task_type_0": Object {
|
||||
"createTaskRunner": [Function],
|
||||
"description": "one super cool task",
|
||||
"numWorkers": 2,
|
||||
"timeOut": "5m",
|
||||
"title": "Test",
|
||||
"type": "test_task_type_0",
|
||||
},
|
||||
"test_task_type_1": Object {
|
||||
"createTaskRunner": [Function],
|
||||
"description": "one super cool task",
|
||||
"numWorkers": 2,
|
||||
"timeOut": "5m",
|
||||
"title": "Test",
|
||||
"type": "test_task_type_1",
|
||||
},
|
||||
}
|
||||
`);
|
||||
});
|
||||
|
||||
it('incorporates overrideNumWorkers to give certain type an override of number of workers', () => {
|
||||
const overrideNumWorkers = {
|
||||
test_task_type_0: 5,
|
||||
test_task_type_1: 2,
|
||||
};
|
||||
const maxWorkers = 5;
|
||||
const taskDefinitions = getMockTaskDefinitions({ numTasks: 3 });
|
||||
const result = sanitizeTaskDefinitions(taskDefinitions, maxWorkers, overrideNumWorkers);
|
||||
|
||||
expect(result).toMatchInlineSnapshot(`
|
||||
Object {
|
||||
"test_task_type_0": Object {
|
||||
"createTaskRunner": [Function],
|
||||
"description": "one super cool task",
|
||||
"numWorkers": 5,
|
||||
"timeOut": "5m",
|
||||
"title": "Test",
|
||||
"type": "test_task_type_0",
|
||||
},
|
||||
"test_task_type_1": Object {
|
||||
"createTaskRunner": [Function],
|
||||
"description": "one super cool task",
|
||||
"numWorkers": 2,
|
||||
"timeOut": "5m",
|
||||
"title": "Test",
|
||||
"type": "test_task_type_1",
|
||||
},
|
||||
"test_task_type_2": Object {
|
||||
"createTaskRunner": [Function],
|
||||
"description": "one super cool task",
|
||||
"numWorkers": 1,
|
||||
"timeOut": "5m",
|
||||
"title": "Test",
|
||||
"type": "test_task_type_2",
|
||||
},
|
||||
}
|
||||
`);
|
||||
});
|
||||
|
||||
it('throws a validation exception for invalid task definition', () => {
|
||||
const runsanitize = () => {
|
||||
const maxWorkers = 10;
|
||||
const overrideNumWorkers = {};
|
||||
const taskDefinitions = {
|
||||
some_kind_of_task: {
|
||||
fail: 'extremely', // cause a validation failure
|
||||
type: 'breaky_task',
|
||||
title: 'Test XYZ',
|
||||
description: `Actually this won't work`,
|
||||
createTaskRunner() {
|
||||
return {
|
||||
async run() {
|
||||
return {};
|
||||
},
|
||||
};
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
return sanitizeTaskDefinitions(taskDefinitions, maxWorkers, overrideNumWorkers);
|
||||
};
|
||||
|
||||
expect(runsanitize).toThrowError();
|
||||
});
|
||||
});
|
|
@ -1,49 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import Joi from 'joi';
|
||||
import {
|
||||
SanitizedTaskDefinition,
|
||||
TaskDefinition,
|
||||
TaskDictionary,
|
||||
validateTaskDefinition,
|
||||
} from '../task';
|
||||
|
||||
/**
|
||||
* Sanitizes the system's task definitions. Task definitions have optional properties, and
|
||||
* this ensures they all are given a reasonable default. This also overrides certain task
|
||||
* definition properties with kibana.yml overrides (such as the `override_num_workers` config
|
||||
* value).
|
||||
*
|
||||
* @param maxWorkers - The maxiumum numer of workers allowed to run at once
|
||||
* @param taskDefinitions - The Kibana task definitions dictionary
|
||||
* @param overrideNumWorkers - The kibana.yml overrides numWorkers per task type.
|
||||
*/
|
||||
export function sanitizeTaskDefinitions(
|
||||
taskDefinitions: TaskDictionary<TaskDefinition> = {},
|
||||
maxWorkers: number,
|
||||
overrideNumWorkers: { [taskType: string]: number }
|
||||
): TaskDictionary<SanitizedTaskDefinition> {
|
||||
return Object.keys(taskDefinitions).reduce(
|
||||
(acc, type) => {
|
||||
const rawDefinition = taskDefinitions[type];
|
||||
rawDefinition.type = type;
|
||||
const definition = Joi.attempt(rawDefinition, validateTaskDefinition) as TaskDefinition;
|
||||
const numWorkers = Math.min(
|
||||
maxWorkers,
|
||||
overrideNumWorkers[definition.type] || definition.numWorkers || 1
|
||||
);
|
||||
|
||||
acc[type] = {
|
||||
...definition,
|
||||
numWorkers,
|
||||
};
|
||||
|
||||
return acc;
|
||||
},
|
||||
{} as TaskDictionary<SanitizedTaskDefinition>
|
||||
);
|
||||
}
|
|
@ -1,9 +0,0 @@
|
|||
{
|
||||
"name": "task_manager",
|
||||
"version": "kibana",
|
||||
"config": {
|
||||
"@elastic/eslint-import-resolver-kibana": {
|
||||
"projectRoot": false
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,234 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import Joi from 'joi';
|
||||
|
||||
/*
|
||||
* Type definitions and validations for tasks.
|
||||
*/
|
||||
|
||||
/**
|
||||
* A loosely typed definition of the elasticjs wrapper. It's beyond the scope
|
||||
* of this work to try to make a comprehensive type definition of this.
|
||||
*/
|
||||
export type ElasticJs = (action: string, args: any) => Promise<any>;
|
||||
|
||||
/**
|
||||
* The run context is passed into a task's run function as its sole argument.
|
||||
*/
|
||||
export interface RunContext {
|
||||
/**
|
||||
* The Kibana server object. This gives tasks full-access to the server object,
|
||||
* including the various ES options client functions
|
||||
*/
|
||||
kbnServer: object;
|
||||
|
||||
/**
|
||||
* The document describing the task instance, its params, state, id, etc.
|
||||
*/
|
||||
taskInstance: ConcreteTaskInstance;
|
||||
}
|
||||
|
||||
/**
|
||||
* The return value of a task's run function should be a promise of RunResult.
|
||||
*/
|
||||
export interface RunResult {
|
||||
/**
|
||||
* Specifies the next run date / time for this task. If unspecified, this is
|
||||
* treated as a single-run task, and will not be rescheduled after
|
||||
* completion.
|
||||
*/
|
||||
runAt?: Date;
|
||||
|
||||
/**
|
||||
* If specified, indicates that the task failed to accomplish its work. This is
|
||||
* logged out as a warning, and the task will be reattempted after a delay.
|
||||
*/
|
||||
error?: object;
|
||||
|
||||
/**
|
||||
* The state which will be passed to the next run of this task (if this is a
|
||||
* recurring task). See the RunContext type definition for more details.
|
||||
*/
|
||||
state?: object;
|
||||
}
|
||||
|
||||
export const validateRunResult = Joi.object({
|
||||
runAt: Joi.date().optional(),
|
||||
error: Joi.object().optional(),
|
||||
state: Joi.object().optional(),
|
||||
}).optional();
|
||||
|
||||
export type RunFunction = () => Promise<RunResult | undefined | void>;
|
||||
|
||||
export type CancelFunction = () => Promise<RunResult | undefined | void>;
|
||||
|
||||
export interface CancellableTask {
|
||||
run: RunFunction;
|
||||
cancel?: CancelFunction;
|
||||
}
|
||||
|
||||
export type TaskRunCreatorFunction = (context: RunContext) => CancellableTask;
|
||||
|
||||
/**
|
||||
* Defines a task which can be scheduled and run by the Kibana
|
||||
* task manager.
|
||||
*/
|
||||
export interface TaskDefinition {
|
||||
/**
|
||||
* A unique identifier for the type of task being defined.
|
||||
*/
|
||||
type: string;
|
||||
|
||||
/**
|
||||
* A brief, human-friendly title for this task.
|
||||
*/
|
||||
title: string;
|
||||
|
||||
/**
|
||||
* An optional more detailed description of what this task does.
|
||||
*/
|
||||
description?: string;
|
||||
|
||||
/**
|
||||
* How long, in minutes, the system should wait for the task to complete
|
||||
* before it is considered to be timed out. (e.g. '5m', the default). If
|
||||
* the task takes longer than this, Kibana will send it a kill command and
|
||||
* the task will be re-attempted.
|
||||
*/
|
||||
timeOut?: string;
|
||||
|
||||
/**
|
||||
* The numer of workers / slots a running instance of this task occupies.
|
||||
* This defaults to 1.
|
||||
*/
|
||||
numWorkers?: number;
|
||||
|
||||
/**
|
||||
* Creates an object that has a run function which performs the task's work,
|
||||
* and an optional cancel function which cancels the task.
|
||||
*/
|
||||
createTaskRunner: TaskRunCreatorFunction;
|
||||
}
|
||||
|
||||
/**
|
||||
* A task definition with all of its properties set to a valid value.
|
||||
*/
|
||||
export interface SanitizedTaskDefinition extends TaskDefinition {
|
||||
numWorkers: number;
|
||||
}
|
||||
|
||||
export const validateTaskDefinition = Joi.object({
|
||||
type: Joi.string().required(),
|
||||
title: Joi.string().optional(),
|
||||
description: Joi.string().optional(),
|
||||
timeOut: Joi.string().default('5m'),
|
||||
numWorkers: Joi.number().default(1),
|
||||
createTaskRunner: Joi.func().required(),
|
||||
}).default();
|
||||
|
||||
/**
|
||||
* A dictionary mapping task types to their definitions.
|
||||
*/
|
||||
export interface TaskDictionary<T extends TaskDefinition> {
|
||||
[taskType: string]: T;
|
||||
}
|
||||
|
||||
export type TaskStatus = 'idle' | 'running';
|
||||
|
||||
/*
|
||||
* A task instance represents all of the data required to store, fetch,
|
||||
* and execute a task.
|
||||
*/
|
||||
export interface TaskInstance {
|
||||
/**
|
||||
* Optional ID that can be passed by the caller. When ID is undefined, ES
|
||||
* will auto-generate a unique id. Otherwise, ID will be used to either
|
||||
* create a new document, or update existing document
|
||||
*/
|
||||
id?: string;
|
||||
|
||||
/**
|
||||
* The task definition type whose run function will execute this instance.
|
||||
*/
|
||||
taskType: string;
|
||||
|
||||
/**
|
||||
* The date and time that this task is scheduled to be run. It is not
|
||||
* guaranteed to run at this time, but it is guaranteed not to run earlier
|
||||
* than this. Defaults to immediately.
|
||||
*/
|
||||
runAt?: Date;
|
||||
|
||||
/**
|
||||
* An interval in minutes (e.g. '5m'). If specified, this is a recurring task.
|
||||
*/
|
||||
interval?: string;
|
||||
|
||||
/**
|
||||
* A task-specific set of parameters, used by the task's run function to tailor
|
||||
* its work. This is generally user-input, such as { sms: '333-444-2222' }.
|
||||
*/
|
||||
params: object;
|
||||
|
||||
/**
|
||||
* The state passed into the task's run function, and returned by the previous
|
||||
* run. If there was no previous run, or if the previous run did not return
|
||||
* any state, this will be the empy object: {}
|
||||
*/
|
||||
state?: object;
|
||||
|
||||
/**
|
||||
* The id of the user who scheduled this task.
|
||||
*/
|
||||
user?: string;
|
||||
|
||||
/**
|
||||
* Used to group tasks for querying. So, reporting might schedule tasks with a scope of 'reporting',
|
||||
* and then query such tasks to provide a glimpse at only reporting tasks, rather than at all tasks.
|
||||
*/
|
||||
scope?: string | string[];
|
||||
}
|
||||
|
||||
/**
|
||||
* A task instance that has an id and is ready for storage.
|
||||
*/
|
||||
export interface ConcreteTaskInstance extends TaskInstance {
|
||||
/**
|
||||
* The id of the Elastic document that stores this instance's data. This can
|
||||
* be passed by the caller when scheduling the task.
|
||||
*/
|
||||
id: string;
|
||||
|
||||
/**
|
||||
* The version of the Elaticsearch document.
|
||||
*/
|
||||
version: number;
|
||||
|
||||
/**
|
||||
* The number of unsuccessful attempts since the last successful run. This
|
||||
* will be zeroed out after a successful run.
|
||||
*/
|
||||
attempts: number;
|
||||
|
||||
/**
|
||||
* Indicates whether or not the task is currently running.
|
||||
*/
|
||||
status: TaskStatus;
|
||||
|
||||
/**
|
||||
* The date and time that this task is scheduled to be run. It is not guaranteed
|
||||
* to run at this time, but it is guaranteed not to run earlier than this.
|
||||
*/
|
||||
runAt: Date;
|
||||
|
||||
/**
|
||||
* The state passed into the task's run function, and returned by the previous
|
||||
* run. If there was no previous run, or if the previous run did not return
|
||||
* any state, this will be the empy object: {}
|
||||
*/
|
||||
state: object;
|
||||
}
|
|
@ -1,148 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import _ from 'lodash';
|
||||
import sinon from 'sinon';
|
||||
import { bindToElasticSearchStatus, TaskManager } from './task_manager';
|
||||
|
||||
describe('TaskManager', () => {
|
||||
let clock: sinon.SinonFakeTimers;
|
||||
const defaultConfig = {
|
||||
task_manager: {
|
||||
max_workers: 10,
|
||||
override_num_workers: {},
|
||||
index: 'foo',
|
||||
max_attempts: 9,
|
||||
poll_interval: 6000000,
|
||||
},
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
clock = sinon.useFakeTimers();
|
||||
});
|
||||
|
||||
afterEach(() => clock.restore());
|
||||
|
||||
test('starts / stops the poller when es goes green / red', async () => {
|
||||
const handlers: any = {};
|
||||
const es = {
|
||||
status: {
|
||||
on: (color: string, handler: any) => (handlers[color] = () => Promise.resolve(handler())),
|
||||
},
|
||||
};
|
||||
const start = sinon.spy(async () => undefined);
|
||||
const stop = sinon.spy(async () => undefined);
|
||||
const init = sinon.spy(async () => undefined);
|
||||
|
||||
bindToElasticSearchStatus(es, { info: _.noop, debug: _.noop }, { stop, start }, { init });
|
||||
|
||||
await handlers.green();
|
||||
sinon.assert.calledOnce(init);
|
||||
sinon.assert.calledOnce(start);
|
||||
sinon.assert.notCalled(stop);
|
||||
|
||||
await handlers.red();
|
||||
sinon.assert.calledOnce(init);
|
||||
sinon.assert.calledOnce(start);
|
||||
sinon.assert.calledOnce(stop);
|
||||
|
||||
await handlers.green();
|
||||
sinon.assert.calledTwice(init);
|
||||
sinon.assert.calledTwice(start);
|
||||
sinon.assert.calledOnce(stop);
|
||||
});
|
||||
|
||||
test('disallows schedule before init', async () => {
|
||||
const { opts } = testOpts();
|
||||
const client = new TaskManager(opts.kbnServer, opts.server, opts.config);
|
||||
const task = {
|
||||
taskType: 'foo',
|
||||
params: {},
|
||||
};
|
||||
await expect(client.schedule(task)).rejects.toThrow(/The task manager is initializing/i);
|
||||
});
|
||||
|
||||
test('disallows fetch before init', async () => {
|
||||
const { opts } = testOpts();
|
||||
const client = new TaskManager(opts.kbnServer, opts.server, opts.config);
|
||||
await expect(client.fetch({})).rejects.toThrow(/The task manager is initializing/i);
|
||||
});
|
||||
|
||||
test('disallows remove before init', async () => {
|
||||
const { opts } = testOpts();
|
||||
const client = new TaskManager(opts.kbnServer, opts.server, opts.config);
|
||||
await expect(client.remove('23')).rejects.toThrow(/The task manager is initializing/i);
|
||||
});
|
||||
|
||||
test('allows middleware registration before init', () => {
|
||||
const { opts } = testOpts();
|
||||
const client = new TaskManager(opts.kbnServer, opts.server, opts.config);
|
||||
const middleware = {
|
||||
beforeSave: async (saveOpts: any) => saveOpts,
|
||||
beforeRun: async (runOpts: any) => runOpts,
|
||||
};
|
||||
expect(() => client.addMiddleware(middleware)).not.toThrow();
|
||||
});
|
||||
|
||||
test('disallows middleware registration after init', async () => {
|
||||
const { $test, opts } = testOpts();
|
||||
const client = new TaskManager(opts.kbnServer, opts.server, opts.config);
|
||||
const middleware = {
|
||||
beforeSave: async (saveOpts: any) => saveOpts,
|
||||
beforeRun: async (runOpts: any) => runOpts,
|
||||
};
|
||||
|
||||
$test.afterPluginsInit();
|
||||
|
||||
expect(() => client.addMiddleware(middleware)).toThrow(
|
||||
/Cannot add middleware after the task manager is initialized/i
|
||||
);
|
||||
});
|
||||
|
||||
function testOpts() {
|
||||
const $test = {
|
||||
events: {} as any,
|
||||
afterPluginsInit: _.noop,
|
||||
};
|
||||
|
||||
const opts = {
|
||||
config: {
|
||||
get: (path: string) => _.get(defaultConfig, path),
|
||||
},
|
||||
kbnServer: {
|
||||
uiExports: {
|
||||
taskDefinitions: {},
|
||||
},
|
||||
afterPluginsInit(callback: any) {
|
||||
$test.afterPluginsInit = callback;
|
||||
},
|
||||
},
|
||||
server: {
|
||||
log: sinon.spy(),
|
||||
decorate(...args: any[]) {
|
||||
_.set(opts, args.slice(0, -1), _.last(args));
|
||||
},
|
||||
plugins: {
|
||||
elasticsearch: {
|
||||
getCluster() {
|
||||
return { callWithInternalUser: _.noop };
|
||||
},
|
||||
status: {
|
||||
on(eventName: string, callback: () => any) {
|
||||
$test.events[eventName] = callback;
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
return {
|
||||
$test,
|
||||
opts,
|
||||
};
|
||||
}
|
||||
});
|
|
@ -1,191 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { fillPool } from './lib/fill_pool';
|
||||
import { TaskManagerLogger } from './lib/logger';
|
||||
import { addMiddlewareToChain, BeforeSaveMiddlewareParams, Middleware } from './lib/middleware';
|
||||
import { sanitizeTaskDefinitions } from './lib/sanitize_task_definitions';
|
||||
import { ConcreteTaskInstance, RunContext, TaskInstance } from './task';
|
||||
import { SanitizedTaskDefinition, TaskDefinition, TaskDictionary } from './task';
|
||||
import { TaskPoller } from './task_poller';
|
||||
import { TaskPool } from './task_pool';
|
||||
import { TaskManagerRunner } from './task_runner';
|
||||
import { FetchOpts, TaskStore } from './task_store';
|
||||
|
||||
/*
|
||||
* The TaskManager is the public interface into the task manager system. This glues together
|
||||
* all of the disparate modules in one integration point. The task manager operates in two different ways:
|
||||
*
|
||||
* - pre-init, it allows middleware registration, but disallows task manipulation
|
||||
* - post-init, it disallows middleware registration, but allows task manipulation
|
||||
*
|
||||
* Due to its complexity, this is mostly tested by integration tests (see readme).
|
||||
*/
|
||||
|
||||
/**
|
||||
* The public interface into the task manager system.
|
||||
*/
|
||||
export class TaskManager {
|
||||
private isInitialized = false;
|
||||
private maxWorkers: number;
|
||||
private overrideNumWorkers: { [taskType: string]: number };
|
||||
private definitions: TaskDictionary<SanitizedTaskDefinition>;
|
||||
private store?: TaskStore;
|
||||
private poller?: TaskPoller;
|
||||
private middleware = {
|
||||
beforeSave: async (saveOpts: BeforeSaveMiddlewareParams) => saveOpts,
|
||||
beforeRun: async (runOpts: RunContext) => runOpts,
|
||||
};
|
||||
|
||||
/**
|
||||
* Initializes the task manager, preventing any further addition of middleware,
|
||||
* enabling the task manipulation methods, and beginning the background polling
|
||||
* mechanism.
|
||||
*/
|
||||
public constructor(kbnServer: any, server: any, config: any) {
|
||||
this.maxWorkers = config.get('xpack.task_manager.max_workers');
|
||||
this.overrideNumWorkers = config.get('xpack.task_manager.override_num_workers');
|
||||
this.definitions = {};
|
||||
|
||||
const logger = new TaskManagerLogger((...args: any[]) => server.log(...args));
|
||||
|
||||
kbnServer.afterPluginsInit(() => {
|
||||
const store = new TaskStore({
|
||||
callCluster: server.plugins.elasticsearch.getCluster('admin').callWithInternalUser,
|
||||
index: config.get('xpack.task_manager.index'),
|
||||
maxAttempts: config.get('xpack.task_manager.max_attempts'),
|
||||
supportedTypes: Object.keys(this.definitions),
|
||||
});
|
||||
const pool = new TaskPool({
|
||||
logger,
|
||||
maxWorkers: this.maxWorkers,
|
||||
});
|
||||
const createRunner = (instance: ConcreteTaskInstance) =>
|
||||
new TaskManagerRunner({
|
||||
logger,
|
||||
kbnServer,
|
||||
instance,
|
||||
store,
|
||||
definition: this.definitions[instance.taskType],
|
||||
beforeRun: this.middleware.beforeRun,
|
||||
});
|
||||
const poller = new TaskPoller({
|
||||
logger,
|
||||
pollInterval: config.get('xpack.task_manager.poll_interval'),
|
||||
work() {
|
||||
return fillPool(pool.run, store.fetchAvailableTasks, createRunner);
|
||||
},
|
||||
});
|
||||
|
||||
bindToElasticSearchStatus(server.plugins.elasticsearch, logger, poller, store);
|
||||
|
||||
this.store = store;
|
||||
this.poller = poller;
|
||||
this.isInitialized = true;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Method for allowing consumers to register task definitions into the system.
|
||||
* @param taskDefinitions - The Kibana task definitions dictionary
|
||||
*/
|
||||
public registerTaskDefinitions(taskDefinitions: TaskDictionary<TaskDefinition>) {
|
||||
this.assertUninitialized('register task definitions');
|
||||
const duplicate = Object.keys(taskDefinitions).find(k => !!this.definitions[k]);
|
||||
if (duplicate) {
|
||||
throw new Error(`Task ${duplicate} is already defined!`);
|
||||
}
|
||||
|
||||
const sanitized = sanitizeTaskDefinitions(
|
||||
taskDefinitions,
|
||||
this.maxWorkers,
|
||||
this.overrideNumWorkers
|
||||
);
|
||||
Object.assign(this.definitions, sanitized);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds middleware to the task manager, such as adding security layers, loggers, etc.
|
||||
*
|
||||
* @param {Middleware} middleware - The middlware being added.
|
||||
*/
|
||||
public addMiddleware(middleware: Middleware) {
|
||||
this.assertUninitialized('add middleware');
|
||||
const prevMiddleWare = this.middleware;
|
||||
this.middleware = addMiddlewareToChain(prevMiddleWare, middleware);
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules a task.
|
||||
*
|
||||
* @param task - The task being scheduled.
|
||||
*/
|
||||
public async schedule(taskInstance: TaskInstance, options?: any) {
|
||||
this.assertInitialized();
|
||||
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
|
||||
...options,
|
||||
taskInstance,
|
||||
});
|
||||
const result = await this.store!.schedule(modifiedTask);
|
||||
this.poller!.attemptWork();
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches a paginatable list of scheduled tasks.
|
||||
*
|
||||
* @param opts - The query options used to filter tasks
|
||||
*/
|
||||
public async fetch(opts: FetchOpts) {
|
||||
this.assertInitialized();
|
||||
return this.store!.fetch(opts);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the specified task from the index.
|
||||
*
|
||||
* @param {string} id
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
public async remove(id: string) {
|
||||
this.assertInitialized();
|
||||
return this.store!.remove(id);
|
||||
}
|
||||
|
||||
private assertUninitialized(message: string) {
|
||||
if (this.isInitialized) {
|
||||
throw new Error(`Cannot ${message} after the task manager is initialized.`);
|
||||
}
|
||||
}
|
||||
|
||||
private assertInitialized() {
|
||||
if (!this.isInitialized) {
|
||||
throw new Error('The task manager is initializing.');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This is exported for test purposes. It is responsible for starting / stopping
|
||||
// the poller based on the elasticsearch plugin status.
|
||||
export function bindToElasticSearchStatus(
|
||||
elasticsearch: any,
|
||||
logger: { debug: (s: string) => any; info: (s: string) => any },
|
||||
poller: { stop: () => any; start: () => Promise<any> },
|
||||
store: { init: () => Promise<any> }
|
||||
) {
|
||||
elasticsearch.status.on('red', () => {
|
||||
logger.debug('Lost connection to Elasticsearch, stopping the poller.');
|
||||
poller.stop();
|
||||
});
|
||||
|
||||
elasticsearch.status.on('green', async () => {
|
||||
logger.debug('Initializing store');
|
||||
await store.init();
|
||||
logger.debug('Starting poller');
|
||||
await poller.start();
|
||||
logger.info('Connected to Elasticsearch, and watching for tasks');
|
||||
});
|
||||
}
|
|
@ -1,135 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import _ from 'lodash';
|
||||
import sinon from 'sinon';
|
||||
import { TaskPoller } from './task_poller';
|
||||
import { mockLogger, resolvable, sleep } from './test_utils';
|
||||
|
||||
describe('TaskPoller', () => {
|
||||
describe('interval tests', () => {
|
||||
let clock: sinon.SinonFakeTimers;
|
||||
|
||||
beforeEach(() => {
|
||||
clock = sinon.useFakeTimers();
|
||||
});
|
||||
|
||||
afterEach(() => clock.restore());
|
||||
|
||||
test('runs the work function on an interval', async () => {
|
||||
const pollInterval = _.random(10, 20);
|
||||
const done = resolvable();
|
||||
const work = sinon.spy(() => {
|
||||
done.resolve();
|
||||
return Promise.resolve();
|
||||
});
|
||||
const poller = new TaskPoller({
|
||||
pollInterval,
|
||||
work,
|
||||
logger: mockLogger(),
|
||||
});
|
||||
|
||||
poller.start();
|
||||
|
||||
sinon.assert.calledOnce(work);
|
||||
await done;
|
||||
|
||||
clock.tick(pollInterval - 1);
|
||||
sinon.assert.calledOnce(work);
|
||||
clock.tick(1);
|
||||
sinon.assert.calledTwice(work);
|
||||
});
|
||||
});
|
||||
|
||||
test('logs, but does not crash if the work function fails', async () => {
|
||||
let count = 0;
|
||||
const logger = mockLogger();
|
||||
const doneWorking = resolvable();
|
||||
const poller = new TaskPoller({
|
||||
logger,
|
||||
pollInterval: 1,
|
||||
work: async () => {
|
||||
++count;
|
||||
if (count === 1) {
|
||||
throw new Error('Dang it!');
|
||||
}
|
||||
if (count > 1) {
|
||||
poller.stop();
|
||||
doneWorking.resolve();
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
poller.start();
|
||||
|
||||
await doneWorking;
|
||||
|
||||
expect(count).toEqual(2);
|
||||
sinon.assert.calledWithMatch(logger.error, /Dang it/i);
|
||||
});
|
||||
|
||||
test('is stoppable', async () => {
|
||||
const doneWorking = resolvable();
|
||||
const work = sinon.spy(async () => {
|
||||
poller.stop();
|
||||
doneWorking.resolve();
|
||||
});
|
||||
|
||||
const poller = new TaskPoller({
|
||||
logger: mockLogger(),
|
||||
pollInterval: 1,
|
||||
work,
|
||||
});
|
||||
|
||||
poller.start();
|
||||
await doneWorking;
|
||||
await sleep(10);
|
||||
|
||||
sinon.assert.calledOnce(work);
|
||||
});
|
||||
|
||||
test('disregards duplicate calls to "start"', async () => {
|
||||
const doneWorking = resolvable();
|
||||
const work = sinon.spy(async () => {
|
||||
await doneWorking;
|
||||
});
|
||||
const poller = new TaskPoller({
|
||||
pollInterval: 1,
|
||||
logger: mockLogger(),
|
||||
work,
|
||||
});
|
||||
|
||||
poller.start();
|
||||
poller.start();
|
||||
poller.start();
|
||||
poller.start();
|
||||
|
||||
poller.stop();
|
||||
|
||||
doneWorking.resolve();
|
||||
|
||||
sinon.assert.calledOnce(work);
|
||||
});
|
||||
|
||||
test('waits for work before polling', async () => {
|
||||
const doneWorking = resolvable();
|
||||
const work = sinon.spy(async () => {
|
||||
await sleep(10);
|
||||
poller.stop();
|
||||
doneWorking.resolve();
|
||||
});
|
||||
const poller = new TaskPoller({
|
||||
pollInterval: 1,
|
||||
logger: mockLogger(),
|
||||
work,
|
||||
});
|
||||
|
||||
poller.start();
|
||||
await doneWorking;
|
||||
|
||||
sinon.assert.calledOnce(work);
|
||||
});
|
||||
});
|
|
@ -1,95 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This module contains the logic for polling the task manager index for new work.
|
||||
*/
|
||||
|
||||
import { Logger } from './lib/logger';
|
||||
|
||||
type WorkFn = () => Promise<void>;
|
||||
|
||||
interface Opts {
|
||||
pollInterval: number;
|
||||
logger: Logger;
|
||||
work: WorkFn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs work on a scheduled interval, logging any errors. This waits for work to complete
|
||||
* (or error) prior to attempting another run.
|
||||
*/
|
||||
export class TaskPoller {
|
||||
private isStarted = false;
|
||||
private isWorking = false;
|
||||
private timeout: any;
|
||||
private pollInterval: number;
|
||||
private logger: Logger;
|
||||
private work: WorkFn;
|
||||
|
||||
/**
|
||||
* Constructs a new TaskPoller.
|
||||
*
|
||||
* @param opts
|
||||
* @prop {number} pollInterval - How often, in milliseconds, we will run the work function
|
||||
* @prop {Logger} logger - The task manager logger
|
||||
* @prop {WorkFn} work - An empty, asynchronous function that performs the desired work
|
||||
*/
|
||||
constructor(opts: Opts) {
|
||||
this.pollInterval = opts.pollInterval;
|
||||
this.logger = opts.logger;
|
||||
this.work = opts.work;
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the poller. If the poller is already running, this has no effect.
|
||||
*/
|
||||
public async start() {
|
||||
if (this.isStarted) {
|
||||
return;
|
||||
}
|
||||
this.isStarted = true;
|
||||
|
||||
const poll = async () => {
|
||||
await this.attemptWork();
|
||||
|
||||
if (this.isStarted) {
|
||||
this.timeout = setTimeout(poll, this.pollInterval);
|
||||
}
|
||||
};
|
||||
|
||||
poll();
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the poller.
|
||||
*/
|
||||
public stop() {
|
||||
this.isStarted = false;
|
||||
clearTimeout(this.timeout);
|
||||
this.timeout = undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs the work function. If the work function is currently running,
|
||||
* this has no effect.
|
||||
*/
|
||||
public async attemptWork() {
|
||||
if (this.isWorking) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.isWorking = true;
|
||||
|
||||
try {
|
||||
await this.work();
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to poll for work ${error.stack}`);
|
||||
} finally {
|
||||
this.isWorking = false;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,201 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import sinon from 'sinon';
|
||||
import { TaskPool } from './task_pool';
|
||||
import { mockLogger, resolvable, sleep } from './test_utils';
|
||||
|
||||
describe('TaskPool', () => {
|
||||
test('occupiedWorkers are a sum of worker costs', async () => {
|
||||
const pool = new TaskPool({
|
||||
maxWorkers: 200,
|
||||
logger: mockLogger(),
|
||||
});
|
||||
|
||||
const result = await pool.run([
|
||||
{ ...mockTask(), numWorkers: 10 },
|
||||
{ ...mockTask(), numWorkers: 20 },
|
||||
{ ...mockTask(), numWorkers: 30 },
|
||||
]);
|
||||
|
||||
expect(result).toBeTruthy();
|
||||
expect(pool.occupiedWorkers).toEqual(60);
|
||||
});
|
||||
|
||||
test('availableWorkers are a function of total_capacity - occupiedWorkers', async () => {
|
||||
const pool = new TaskPool({
|
||||
maxWorkers: 100,
|
||||
logger: mockLogger(),
|
||||
});
|
||||
|
||||
const result = await pool.run([
|
||||
{ ...mockTask(), numWorkers: 20 },
|
||||
{ ...mockTask(), numWorkers: 30 },
|
||||
{ ...mockTask(), numWorkers: 40 },
|
||||
]);
|
||||
|
||||
expect(result).toBeTruthy();
|
||||
expect(pool.availableWorkers).toEqual(10);
|
||||
});
|
||||
|
||||
test('does not run tasks that are beyond its available capacity', async () => {
|
||||
const pool = new TaskPool({
|
||||
maxWorkers: 10,
|
||||
logger: mockLogger(),
|
||||
});
|
||||
|
||||
const shouldRun = mockRun();
|
||||
const shouldNotRun = mockRun();
|
||||
|
||||
const result = await pool.run([
|
||||
{ ...mockTask(), numWorkers: 9, run: shouldRun },
|
||||
{ ...mockTask(), numWorkers: 9, run: shouldNotRun },
|
||||
]);
|
||||
|
||||
expect(result).toBeFalsy();
|
||||
expect(pool.availableWorkers).toEqual(1);
|
||||
sinon.assert.calledOnce(shouldRun);
|
||||
sinon.assert.notCalled(shouldNotRun);
|
||||
});
|
||||
|
||||
test('clears up capacity when a task completes', async () => {
|
||||
const pool = new TaskPool({
|
||||
maxWorkers: 10,
|
||||
logger: mockLogger(),
|
||||
});
|
||||
|
||||
const firstWork = resolvable();
|
||||
const firstRun = sinon.spy(async () => {
|
||||
await sleep(0);
|
||||
firstWork.resolve();
|
||||
});
|
||||
const secondWork = resolvable();
|
||||
const secondRun = sinon.spy(async () => {
|
||||
await sleep(0);
|
||||
secondWork.resolve();
|
||||
});
|
||||
|
||||
const result = await pool.run([
|
||||
{ ...mockTask(), numWorkers: 9, run: firstRun },
|
||||
{ ...mockTask(), numWorkers: 2, run: secondRun },
|
||||
]);
|
||||
|
||||
expect(result).toBeFalsy();
|
||||
expect(pool.occupiedWorkers).toEqual(9);
|
||||
expect(pool.availableWorkers).toEqual(1);
|
||||
|
||||
await firstWork;
|
||||
sinon.assert.calledOnce(firstRun);
|
||||
sinon.assert.notCalled(secondRun);
|
||||
|
||||
await pool.run([{ ...mockTask(), numWorkers: 2, run: secondRun }]);
|
||||
|
||||
expect(pool.occupiedWorkers).toEqual(2);
|
||||
expect(pool.availableWorkers).toEqual(8);
|
||||
|
||||
await secondWork;
|
||||
|
||||
expect(pool.occupiedWorkers).toEqual(0);
|
||||
expect(pool.availableWorkers).toEqual(10);
|
||||
sinon.assert.calledOnce(secondRun);
|
||||
});
|
||||
|
||||
test('run cancels expired tasks prior to running new tasks', async () => {
|
||||
const pool = new TaskPool({
|
||||
maxWorkers: 10,
|
||||
logger: mockLogger(),
|
||||
});
|
||||
|
||||
const expired = resolvable();
|
||||
const shouldRun = sinon.spy(() => Promise.resolve());
|
||||
const shouldNotRun = sinon.spy(() => Promise.resolve());
|
||||
const result = await pool.run([
|
||||
{
|
||||
...mockTask(),
|
||||
numWorkers: 9,
|
||||
async run() {
|
||||
this.isExpired = true;
|
||||
expired.resolve();
|
||||
await sleep(10);
|
||||
return {};
|
||||
},
|
||||
cancel: shouldRun,
|
||||
},
|
||||
{
|
||||
...mockTask(),
|
||||
numWorkers: 1,
|
||||
async run() {
|
||||
await sleep(10);
|
||||
return {};
|
||||
},
|
||||
cancel: shouldNotRun,
|
||||
},
|
||||
]);
|
||||
|
||||
expect(result).toBeTruthy();
|
||||
expect(pool.occupiedWorkers).toEqual(10);
|
||||
expect(pool.availableWorkers).toEqual(0);
|
||||
|
||||
await expired;
|
||||
|
||||
expect(await pool.run([{ ...mockTask(), numWorkers: 7 }])).toBeTruthy();
|
||||
sinon.assert.calledOnce(shouldRun);
|
||||
sinon.assert.notCalled(shouldNotRun);
|
||||
|
||||
expect(pool.occupiedWorkers).toEqual(8);
|
||||
expect(pool.availableWorkers).toEqual(2);
|
||||
});
|
||||
|
||||
test('logs if cancellation errors', async () => {
|
||||
const logger = mockLogger();
|
||||
const pool = new TaskPool({
|
||||
logger,
|
||||
maxWorkers: 20,
|
||||
});
|
||||
|
||||
const cancelled = resolvable();
|
||||
const result = await pool.run([
|
||||
{
|
||||
...mockTask(),
|
||||
numWorkers: 7,
|
||||
async run() {
|
||||
this.isExpired = true;
|
||||
await sleep(10);
|
||||
return {};
|
||||
},
|
||||
async cancel() {
|
||||
cancelled.resolve();
|
||||
throw new Error('Dern!');
|
||||
},
|
||||
toString: () => '"shooooo!"',
|
||||
},
|
||||
]);
|
||||
|
||||
expect(result).toBeTruthy();
|
||||
await pool.run([]);
|
||||
|
||||
expect(pool.occupiedWorkers).toEqual(0);
|
||||
|
||||
// Allow the task to cancel...
|
||||
await cancelled;
|
||||
|
||||
sinon.assert.calledWithMatch(logger.error, /Failed to cancel task "shooooo!"/);
|
||||
});
|
||||
|
||||
function mockRun() {
|
||||
return sinon.spy(async () => sleep(0));
|
||||
}
|
||||
|
||||
function mockTask() {
|
||||
return {
|
||||
numWorkers: 1,
|
||||
isExpired: false,
|
||||
cancel: async () => undefined,
|
||||
claimOwnership: async () => true,
|
||||
run: mockRun(),
|
||||
};
|
||||
}
|
||||
});
|
|
@ -1,109 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This module contains the logic that ensures we don't run too many
|
||||
* tasks at once in a given Kibana instance.
|
||||
*/
|
||||
|
||||
import { Logger } from './lib/logger';
|
||||
import { TaskRunner } from './task_runner';
|
||||
|
||||
interface Opts {
|
||||
maxWorkers: number;
|
||||
logger: Logger;
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs tasks in batches, taking costs into account.
|
||||
*/
|
||||
export class TaskPool {
|
||||
private maxWorkers: number;
|
||||
private running = new Set<TaskRunner>();
|
||||
private logger: Logger;
|
||||
|
||||
/**
|
||||
* Creates an instance of TaskPool.
|
||||
*
|
||||
* @param {Opts} opts
|
||||
* @prop {number} maxWorkers - The total number of workers / work slots available
|
||||
* (e.g. maxWorkers is 4, then 2 tasks of cost 2 can run at a time, or 4 tasks of cost 1)
|
||||
* @prop {Logger} logger - The task manager logger.
|
||||
*/
|
||||
constructor(opts: Opts) {
|
||||
this.maxWorkers = opts.maxWorkers;
|
||||
this.logger = opts.logger;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets how many workers are currently in use.
|
||||
*/
|
||||
get occupiedWorkers() {
|
||||
let total = 0;
|
||||
|
||||
this.running.forEach(({ numWorkers }) => (total += numWorkers));
|
||||
|
||||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets how many workers are currently available.
|
||||
*/
|
||||
get availableWorkers() {
|
||||
return this.maxWorkers - this.occupiedWorkers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to run the specified list of tasks. Returns true if it was able
|
||||
* to start every task in the list, false if there was not enough capacity
|
||||
* to run every task.
|
||||
*
|
||||
* @param {TaskRunner[]} tasks
|
||||
* @returns {Promise<boolean>}
|
||||
*/
|
||||
public run = (tasks: TaskRunner[]) => {
|
||||
this.cancelExpiredTasks();
|
||||
return this.attemptToRun(tasks);
|
||||
};
|
||||
|
||||
private async attemptToRun(tasks: TaskRunner[]) {
|
||||
for (const task of tasks) {
|
||||
if (this.availableWorkers < task.numWorkers) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (await task.claimOwnership()) {
|
||||
this.running.add(task);
|
||||
task
|
||||
.run()
|
||||
.catch(error => {
|
||||
this.logger.warning(`Task ${task} failed in attempt to run: ${error.stack}`);
|
||||
})
|
||||
.then(() => this.running.delete(task));
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private cancelExpiredTasks() {
|
||||
for (const task of this.running) {
|
||||
if (task.isExpired) {
|
||||
this.cancelTask(task);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async cancelTask(task: TaskRunner) {
|
||||
try {
|
||||
this.logger.debug(`Cancelling expired task ${task}.`);
|
||||
this.running.delete(task);
|
||||
await task.cancel();
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to cancel task ${task}: ${error.stack}`);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,281 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import _ from 'lodash';
|
||||
import sinon from 'sinon';
|
||||
import { minutesFromNow } from './lib/intervals';
|
||||
import { ConcreteTaskInstance, TaskDefinition } from './task';
|
||||
import { TaskManagerRunner } from './task_runner';
|
||||
|
||||
describe('TaskManagerRunner', () => {
|
||||
test('provides details about the task that is running', () => {
|
||||
const { runner } = testOpts({
|
||||
instance: {
|
||||
id: 'foo',
|
||||
taskType: 'bar',
|
||||
},
|
||||
});
|
||||
|
||||
expect(runner.id).toEqual('foo');
|
||||
expect(runner.taskType).toEqual('bar');
|
||||
expect(runner.toString()).toEqual('bar "foo"');
|
||||
});
|
||||
|
||||
test('warns if the task returns an unexpected result', async () => {
|
||||
await allowsReturnType(undefined);
|
||||
await allowsReturnType({});
|
||||
await allowsReturnType({
|
||||
runAt: new Date(),
|
||||
});
|
||||
await allowsReturnType({
|
||||
error: new Error('Dang it!'),
|
||||
});
|
||||
await allowsReturnType({
|
||||
state: { shazm: true },
|
||||
});
|
||||
await disallowsReturnType('hm....');
|
||||
await disallowsReturnType({
|
||||
whatIsThis: '?!!?',
|
||||
});
|
||||
});
|
||||
|
||||
test('queues a reattempt if the task fails', async () => {
|
||||
const initialAttempts = _.random(0, 2);
|
||||
const id = Date.now().toString();
|
||||
const { runner, store } = testOpts({
|
||||
instance: {
|
||||
id,
|
||||
attempts: initialAttempts,
|
||||
params: { a: 'b' },
|
||||
state: { hey: 'there' },
|
||||
},
|
||||
definition: {
|
||||
createTaskRunner: () => ({
|
||||
async run() {
|
||||
throw new Error('Dangit!');
|
||||
},
|
||||
}),
|
||||
},
|
||||
});
|
||||
|
||||
await runner.run();
|
||||
|
||||
sinon.assert.calledOnce(store.update);
|
||||
const instance = store.update.args[0][0];
|
||||
|
||||
expect(instance.id).toEqual(id);
|
||||
expect(instance.attempts).toEqual(initialAttempts + 1);
|
||||
expect(instance.runAt.getTime()).toBeGreaterThan(Date.now());
|
||||
expect(instance.params).toEqual({ a: 'b' });
|
||||
expect(instance.state).toEqual({ hey: 'there' });
|
||||
});
|
||||
|
||||
test('reschedules tasks that have an interval', async () => {
|
||||
const { runner, store } = testOpts({
|
||||
instance: {
|
||||
interval: '10m',
|
||||
},
|
||||
});
|
||||
|
||||
await runner.run();
|
||||
|
||||
sinon.assert.calledOnce(store.update);
|
||||
const instance = store.update.args[0][0];
|
||||
|
||||
expect(instance.runAt.getTime()).toBeGreaterThan(minutesFromNow(9).getTime());
|
||||
expect(instance.runAt.getTime()).toBeLessThanOrEqual(minutesFromNow(10).getTime());
|
||||
});
|
||||
|
||||
test('reschedules tasks that return a runAt', async () => {
|
||||
const runAt = minutesFromNow(_.random(1, 10));
|
||||
const { runner, store } = testOpts({
|
||||
definition: {
|
||||
createTaskRunner: () => ({
|
||||
async run() {
|
||||
return { runAt };
|
||||
},
|
||||
}),
|
||||
},
|
||||
});
|
||||
|
||||
await runner.run();
|
||||
|
||||
sinon.assert.calledOnce(store.update);
|
||||
sinon.assert.calledWithMatch(store.update, { runAt });
|
||||
});
|
||||
|
||||
test('tasks that return runAt override interval', async () => {
|
||||
const runAt = minutesFromNow(_.random(5));
|
||||
const { runner, store } = testOpts({
|
||||
instance: {
|
||||
interval: '20m',
|
||||
},
|
||||
definition: {
|
||||
createTaskRunner: () => ({
|
||||
async run() {
|
||||
return { runAt };
|
||||
},
|
||||
}),
|
||||
},
|
||||
});
|
||||
|
||||
await runner.run();
|
||||
|
||||
sinon.assert.calledOnce(store.update);
|
||||
sinon.assert.calledWithMatch(store.update, { runAt });
|
||||
});
|
||||
|
||||
test('removes non-recurring tasks after they complete', async () => {
|
||||
const id = _.random(1, 20).toString();
|
||||
const { runner, store } = testOpts({
|
||||
instance: {
|
||||
id,
|
||||
interval: undefined,
|
||||
},
|
||||
definition: {
|
||||
createTaskRunner: () => ({
|
||||
async run() {
|
||||
return undefined;
|
||||
},
|
||||
}),
|
||||
},
|
||||
});
|
||||
|
||||
await runner.run();
|
||||
|
||||
sinon.assert.calledOnce(store.remove);
|
||||
sinon.assert.calledWith(store.remove, id);
|
||||
});
|
||||
|
||||
test('cancel cancels the task runner, if it is cancellable', async () => {
|
||||
let wasCancelled = false;
|
||||
const { runner, logger } = testOpts({
|
||||
definition: {
|
||||
createTaskRunner: () => ({
|
||||
async run() {
|
||||
await new Promise(r => setTimeout(r, 1000));
|
||||
},
|
||||
async cancel() {
|
||||
wasCancelled = true;
|
||||
},
|
||||
}),
|
||||
},
|
||||
});
|
||||
|
||||
const promise = runner.run();
|
||||
await new Promise(r => setTimeout(r, 1));
|
||||
await runner.cancel();
|
||||
await promise;
|
||||
|
||||
expect(wasCancelled).toBeTruthy();
|
||||
sinon.assert.neverCalledWithMatch(logger.warning, /not cancellable/);
|
||||
});
|
||||
|
||||
test('warns if cancel is called on a non-cancellable task', async () => {
|
||||
const { runner, logger } = testOpts({
|
||||
definition: {
|
||||
createTaskRunner: () => ({
|
||||
run: async () => undefined,
|
||||
}),
|
||||
},
|
||||
});
|
||||
|
||||
const promise = runner.run();
|
||||
await runner.cancel();
|
||||
await promise;
|
||||
|
||||
sinon.assert.calledWithMatch(logger.warning, /not cancellable/);
|
||||
});
|
||||
|
||||
interface TestOpts {
|
||||
instance?: Partial<ConcreteTaskInstance>;
|
||||
definition?: Partial<TaskDefinition>;
|
||||
}
|
||||
|
||||
function testOpts(opts: TestOpts) {
|
||||
const callCluster = sinon.stub();
|
||||
const createTaskRunner = sinon.stub();
|
||||
const logger = {
|
||||
error: sinon.stub(),
|
||||
debug: sinon.stub(),
|
||||
info: sinon.stub(),
|
||||
warning: sinon.stub(),
|
||||
};
|
||||
const store = {
|
||||
update: sinon.stub(),
|
||||
remove: sinon.stub(),
|
||||
};
|
||||
const runner = new TaskManagerRunner({
|
||||
kbnServer: sinon.stub(),
|
||||
beforeRun: context => Promise.resolve(context),
|
||||
logger,
|
||||
store,
|
||||
instance: Object.assign(
|
||||
{
|
||||
id: 'foo',
|
||||
taskType: 'bar',
|
||||
version: 32,
|
||||
runAt: new Date(),
|
||||
attempts: 0,
|
||||
params: {},
|
||||
scope: 'reporting',
|
||||
state: {},
|
||||
status: 'idle',
|
||||
user: 'example',
|
||||
},
|
||||
opts.instance || {}
|
||||
),
|
||||
definition: Object.assign(
|
||||
{
|
||||
type: 'bar',
|
||||
title: 'Bar!',
|
||||
createTaskRunner,
|
||||
},
|
||||
opts.definition || {}
|
||||
),
|
||||
});
|
||||
|
||||
return {
|
||||
callCluster,
|
||||
createTaskRunner,
|
||||
runner,
|
||||
logger,
|
||||
store,
|
||||
};
|
||||
}
|
||||
|
||||
async function testReturn(result: any, shouldBeValid: boolean) {
|
||||
const { runner, logger } = testOpts({
|
||||
definition: {
|
||||
createTaskRunner: () => ({
|
||||
run: async () => result,
|
||||
}),
|
||||
},
|
||||
});
|
||||
|
||||
await runner.run();
|
||||
|
||||
try {
|
||||
if (shouldBeValid) {
|
||||
sinon.assert.notCalled(logger.warning);
|
||||
} else {
|
||||
sinon.assert.calledWith(logger.warning, sinon.match(/invalid task result/i));
|
||||
}
|
||||
} catch (err) {
|
||||
sinon.assert.fail(
|
||||
`Expected result ${JSON.stringify(result)} to be ${shouldBeValid ? 'valid' : 'invalid'}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
function allowsReturnType(result: any) {
|
||||
return testReturn(result, true);
|
||||
}
|
||||
|
||||
function disallowsReturnType(result: any) {
|
||||
return testReturn(result, false);
|
||||
}
|
||||
});
|
|
@ -1,234 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This module contains the core logic for running an individual task.
|
||||
* It handles the full lifecycle of a task run, including error handling,
|
||||
* rescheduling, middleware application, etc.
|
||||
*/
|
||||
|
||||
import Joi from 'joi';
|
||||
import { intervalFromNow, minutesFromNow } from './lib/intervals';
|
||||
import { Logger } from './lib/logger';
|
||||
import { BeforeRunFunction } from './lib/middleware';
|
||||
import {
|
||||
CancelFunction,
|
||||
CancellableTask,
|
||||
ConcreteTaskInstance,
|
||||
RunResult,
|
||||
TaskDefinition,
|
||||
validateRunResult,
|
||||
} from './task';
|
||||
import { RemoveResult } from './task_store';
|
||||
|
||||
export interface TaskRunner {
|
||||
numWorkers: number;
|
||||
isExpired: boolean;
|
||||
cancel: CancelFunction;
|
||||
claimOwnership: () => Promise<boolean>;
|
||||
run: () => Promise<RunResult>;
|
||||
toString?: () => string;
|
||||
}
|
||||
|
||||
interface Updatable {
|
||||
update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance>;
|
||||
remove(id: string): Promise<RemoveResult>;
|
||||
}
|
||||
|
||||
interface Opts {
|
||||
logger: Logger;
|
||||
definition: TaskDefinition;
|
||||
instance: ConcreteTaskInstance;
|
||||
store: Updatable;
|
||||
kbnServer: any;
|
||||
beforeRun: BeforeRunFunction;
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs a background task, ensures that errors are properly handled,
|
||||
* allows for cancellation.
|
||||
*
|
||||
* @export
|
||||
* @class TaskManagerRunner
|
||||
* @implements {TaskRunner}
|
||||
*/
|
||||
export class TaskManagerRunner implements TaskRunner {
|
||||
private task?: CancellableTask;
|
||||
private instance: ConcreteTaskInstance;
|
||||
private definition: TaskDefinition;
|
||||
private logger: Logger;
|
||||
private store: Updatable;
|
||||
private kbnServer: any;
|
||||
private beforeRun: BeforeRunFunction;
|
||||
|
||||
/**
|
||||
* Creates an instance of TaskManagerRunner.
|
||||
* @param {Opts} opts
|
||||
* @prop {Logger} logger - The task manager logger
|
||||
* @prop {TaskDefinition} definition - The definition of the task being run
|
||||
* @prop {ConcreteTaskInstance} instance - The record describing this particular task instance
|
||||
* @prop {Updatable} store - The store used to read / write tasks instance info
|
||||
* @prop {kbnServer} kbnServer - An async function that provides the task's run context
|
||||
* @prop {BeforeRunFunction} beforeRun - A function that adjusts the run context prior to running the task
|
||||
* @memberof TaskManagerRunner
|
||||
*/
|
||||
constructor(opts: Opts) {
|
||||
this.instance = sanitizeInstance(opts.instance);
|
||||
this.definition = opts.definition;
|
||||
this.logger = opts.logger;
|
||||
this.store = opts.store;
|
||||
this.kbnServer = opts.kbnServer;
|
||||
this.beforeRun = opts.beforeRun;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets how many workers are occupied by this task instance.
|
||||
*/
|
||||
public get numWorkers() {
|
||||
return this.definition.numWorkers || 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the id of this task instance.
|
||||
*/
|
||||
public get id() {
|
||||
return this.instance.id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the task type of this task instance.
|
||||
*/
|
||||
public get taskType() {
|
||||
return this.instance.taskType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets whether or not this task has run longer than its expiration setting allows.
|
||||
*/
|
||||
public get isExpired() {
|
||||
return this.instance.runAt < new Date();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a log-friendly representation of this task.
|
||||
*/
|
||||
public toString() {
|
||||
return `${this.instance.taskType} "${this.instance.id}"`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs the task, handling the task result, errors, etc, rescheduling if need
|
||||
* be. NOTE: the time of applying the middleware's beforeRun is incorporated
|
||||
* into the total timeout time the task in configured with. We may decide to
|
||||
* start the timer after beforeRun resolves
|
||||
*
|
||||
* @returns {Promise<RunResult>}
|
||||
*/
|
||||
public async run(): Promise<RunResult> {
|
||||
try {
|
||||
this.logger.debug(`Running task ${this}`);
|
||||
const modifiedContext = await this.beforeRun({
|
||||
kbnServer: this.kbnServer,
|
||||
taskInstance: this.instance,
|
||||
});
|
||||
const task = this.definition.createTaskRunner(modifiedContext);
|
||||
this.task = task;
|
||||
return this.processResult(this.validateResult(await this.task.run()));
|
||||
} catch (error) {
|
||||
this.logger.warning(`Task ${this} failed ${error.stack}`);
|
||||
this.logger.debug(`Task ${JSON.stringify(this.instance)} failed ${error.stack}`);
|
||||
|
||||
return this.processResult({ error });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to claim exclusive rights to run the task. If the attempt fails
|
||||
* with a 409 (http conflict), we assume another Kibana instance beat us to the punch.
|
||||
*
|
||||
* @returns {Promise<boolean>}
|
||||
*/
|
||||
public async claimOwnership(): Promise<boolean> {
|
||||
const VERSION_CONFLICT_STATUS = 409;
|
||||
|
||||
try {
|
||||
this.instance = await this.store.update({
|
||||
...this.instance,
|
||||
status: 'running',
|
||||
runAt: intervalFromNow(this.definition.timeOut)!,
|
||||
});
|
||||
|
||||
return true;
|
||||
} catch (error) {
|
||||
if (error.statusCode !== VERSION_CONFLICT_STATUS) {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to cancel the task.
|
||||
*
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
public async cancel() {
|
||||
const { task } = this;
|
||||
if (task && task.cancel) {
|
||||
this.task = undefined;
|
||||
return task.cancel();
|
||||
}
|
||||
|
||||
this.logger.warning(`The task ${this} is not cancellable.`);
|
||||
}
|
||||
|
||||
private validateResult(result?: RunResult | void): RunResult {
|
||||
const { error } = Joi.validate(result, validateRunResult);
|
||||
|
||||
if (error) {
|
||||
this.logger.warning(`Invalid task result for ${this}: ${error.message}`);
|
||||
}
|
||||
|
||||
return result || {};
|
||||
}
|
||||
|
||||
private async processResult(result: RunResult): Promise<RunResult> {
|
||||
const runAt = result.runAt || intervalFromNow(this.instance.interval);
|
||||
const state = result.state || this.instance.state || {};
|
||||
|
||||
if (runAt || result.error) {
|
||||
await this.store.update({
|
||||
...this.instance,
|
||||
runAt: runAt || minutesFromNow((this.instance.attempts + 1) * 5),
|
||||
state,
|
||||
attempts: result.error ? this.instance.attempts + 1 : 0,
|
||||
});
|
||||
} else {
|
||||
try {
|
||||
await this.store.remove(this.instance.id);
|
||||
} catch (err) {
|
||||
if (err.statusCode === 404) {
|
||||
this.logger.warning(
|
||||
`Task cleanup of ${this} failed in processing. Was remove called twice?`
|
||||
);
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
function sanitizeInstance(instance: ConcreteTaskInstance): ConcreteTaskInstance {
|
||||
return {
|
||||
...instance,
|
||||
params: instance.params || {},
|
||||
state: instance.state || {},
|
||||
};
|
||||
}
|
|
@ -1,519 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import _ from 'lodash';
|
||||
import sinon from 'sinon';
|
||||
import { TaskInstance, TaskStatus } from './task';
|
||||
import { FetchOpts, TaskStore } from './task_store';
|
||||
|
||||
describe('TaskStore', () => {
|
||||
describe('init', () => {
|
||||
test('creates the task manager index', async () => {
|
||||
const callCluster = sinon.spy();
|
||||
const store = new TaskStore({
|
||||
callCluster,
|
||||
index: 'tasky',
|
||||
maxAttempts: 2,
|
||||
supportedTypes: ['a', 'b', 'c'],
|
||||
});
|
||||
|
||||
await store.init();
|
||||
|
||||
sinon.assert.calledOnce(callCluster);
|
||||
|
||||
callCluster.calledOnceWith('indices.create', {
|
||||
index: 'tasky',
|
||||
settings: {
|
||||
number_of_shards: 1,
|
||||
auto_expand_replicas: '0-1',
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
test('patches the task manager index mappings if the index already exists', async () => {
|
||||
const callCluster = sinon.spy((path: string) => {
|
||||
if (path === 'indices.create') {
|
||||
return Promise.reject({ body: { error: { type: 'resource_already_exists_exception' } } });
|
||||
}
|
||||
});
|
||||
const store = new TaskStore({
|
||||
callCluster,
|
||||
index: 'taskalicious',
|
||||
maxAttempts: 2,
|
||||
supportedTypes: ['a', 'b', 'c'],
|
||||
});
|
||||
|
||||
await store.init();
|
||||
|
||||
sinon.assert.calledTwice(callCluster);
|
||||
|
||||
callCluster.calledOnceWith('indices.putMapping', {
|
||||
index: 'taskalicious',
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('schedule', () => {
|
||||
async function testSchedule(task: TaskInstance) {
|
||||
const callCluster = sinon.spy(() =>
|
||||
Promise.resolve({
|
||||
_id: 'testid',
|
||||
_version: 3344,
|
||||
})
|
||||
);
|
||||
const store = new TaskStore({
|
||||
callCluster,
|
||||
index: 'tasky',
|
||||
maxAttempts: 2,
|
||||
supportedTypes: ['report', 'dernstraight', 'yawn'],
|
||||
});
|
||||
|
||||
const result = await store.schedule(task);
|
||||
|
||||
sinon.assert.calledOnce(callCluster);
|
||||
|
||||
return { result, callCluster, arg: callCluster.args[0][1] };
|
||||
}
|
||||
|
||||
test('serializes the params and state', async () => {
|
||||
const task = {
|
||||
params: { hello: 'world' },
|
||||
state: { foo: 'bar' },
|
||||
taskType: 'report',
|
||||
};
|
||||
const { callCluster, arg } = await testSchedule(task);
|
||||
|
||||
sinon.assert.calledOnce(callCluster);
|
||||
sinon.assert.calledWith(callCluster, 'index');
|
||||
|
||||
expect(arg).toMatchObject({
|
||||
index: 'tasky',
|
||||
type: '_doc',
|
||||
body: {
|
||||
task: {
|
||||
params: JSON.stringify(task.params),
|
||||
state: JSON.stringify(task.state),
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
test('retiurns a concrete task instance', async () => {
|
||||
const task = {
|
||||
params: { hello: 'world' },
|
||||
state: { foo: 'bar' },
|
||||
taskType: 'report',
|
||||
};
|
||||
const { result } = await testSchedule(task);
|
||||
|
||||
expect(result).toMatchObject({
|
||||
...task,
|
||||
version: 3344,
|
||||
id: 'testid',
|
||||
});
|
||||
});
|
||||
|
||||
test('sets runAt to now if not specified', async () => {
|
||||
const now = Date.now();
|
||||
const { arg } = await testSchedule({ taskType: 'dernstraight', params: {} });
|
||||
expect(arg.body.task.runAt.getTime()).toBeGreaterThanOrEqual(now);
|
||||
});
|
||||
|
||||
test('ensures params and state are not null', async () => {
|
||||
const { arg } = await testSchedule({ taskType: 'yawn' } as any);
|
||||
expect(arg.body.task.params).toEqual('{}');
|
||||
expect(arg.body.task.state).toEqual('{}');
|
||||
});
|
||||
|
||||
test('errors if the task type is unknown', async () => {
|
||||
await expect(testSchedule({ taskType: 'nope', params: {} })).rejects.toThrow(
|
||||
/Unsupported task type "nope"/i
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('fetch', () => {
|
||||
async function testFetch(opts?: FetchOpts, hits: any[] = []) {
|
||||
const callCluster = sinon.spy(async () => ({ hits: { hits } }));
|
||||
const store = new TaskStore({
|
||||
callCluster,
|
||||
index: 'tasky',
|
||||
maxAttempts: 2,
|
||||
supportedTypes: ['a', 'b', 'c'],
|
||||
});
|
||||
|
||||
const result = await store.fetch(opts);
|
||||
|
||||
sinon.assert.calledOnce(callCluster);
|
||||
sinon.assert.calledWith(callCluster, 'search');
|
||||
|
||||
return {
|
||||
result,
|
||||
args: callCluster.args[0][1],
|
||||
};
|
||||
}
|
||||
|
||||
test('empty call filters by type, sorts by runAt and id', async () => {
|
||||
const { args } = await testFetch();
|
||||
expect(args).toMatchObject({
|
||||
type: '_doc',
|
||||
index: 'tasky',
|
||||
body: {
|
||||
sort: [{ 'task.runAt': 'asc' }, { _id: 'desc' }],
|
||||
query: { term: { type: 'task' } },
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
test('allows custom queries', async () => {
|
||||
const { args } = await testFetch({
|
||||
query: {
|
||||
term: { 'task.taskType': 'bar' },
|
||||
},
|
||||
});
|
||||
|
||||
expect(args).toMatchObject({
|
||||
body: {
|
||||
query: {
|
||||
bool: {
|
||||
must: [{ term: { type: 'task' } }, { term: { 'task.taskType': 'bar' } }],
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
test('sorts by id if custom sort does not have an id sort in it', async () => {
|
||||
const { args } = await testFetch({
|
||||
sort: [{ 'task.taskType': 'desc' }],
|
||||
});
|
||||
|
||||
expect(args).toMatchObject({
|
||||
body: {
|
||||
sort: [{ 'task.taskType': 'desc' }, { _id: 'desc' }],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
test('allows custom sort by id', async () => {
|
||||
const { args } = await testFetch({
|
||||
sort: [{ _id: 'asc' }],
|
||||
});
|
||||
|
||||
expect(args).toMatchObject({
|
||||
body: {
|
||||
sort: [{ _id: 'asc' }],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
test('allows specifying pagination', async () => {
|
||||
const now = new Date();
|
||||
const searchAfter = [now, '143243sdafa32'];
|
||||
const { args } = await testFetch({
|
||||
searchAfter,
|
||||
});
|
||||
|
||||
expect(args).toMatchObject({
|
||||
body: {
|
||||
search_after: searchAfter,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
test('returns paginated tasks', async () => {
|
||||
const runAt = new Date();
|
||||
const { result } = await testFetch(undefined, [
|
||||
{
|
||||
_id: 'aaa',
|
||||
_source: {
|
||||
type: 'task',
|
||||
task: {
|
||||
runAt,
|
||||
taskType: 'foo',
|
||||
interval: undefined,
|
||||
attempts: 0,
|
||||
status: 'idle',
|
||||
params: '{ "hello": "world" }',
|
||||
state: '{ "baby": "Henhen" }',
|
||||
user: 'jimbo',
|
||||
scope: 'reporting',
|
||||
},
|
||||
},
|
||||
sort: ['a', 1],
|
||||
},
|
||||
{
|
||||
_id: 'bbb',
|
||||
_source: {
|
||||
type: 'task',
|
||||
task: {
|
||||
runAt,
|
||||
taskType: 'bar',
|
||||
interval: '5m',
|
||||
attempts: 2,
|
||||
status: 'running',
|
||||
params: '{ "shazm": 1 }',
|
||||
state: '{ "henry": "The 8th" }',
|
||||
user: 'dabo',
|
||||
scope: ['reporting', 'ceo'],
|
||||
},
|
||||
},
|
||||
sort: ['b', 2],
|
||||
},
|
||||
]);
|
||||
|
||||
expect(result).toEqual({
|
||||
docs: [
|
||||
{
|
||||
attempts: 0,
|
||||
id: 'aaa',
|
||||
interval: undefined,
|
||||
params: { hello: 'world' },
|
||||
runAt,
|
||||
scope: 'reporting',
|
||||
state: { baby: 'Henhen' },
|
||||
status: 'idle',
|
||||
taskType: 'foo',
|
||||
user: 'jimbo',
|
||||
version: undefined,
|
||||
},
|
||||
{
|
||||
attempts: 2,
|
||||
id: 'bbb',
|
||||
interval: '5m',
|
||||
params: { shazm: 1 },
|
||||
runAt,
|
||||
scope: ['reporting', 'ceo'],
|
||||
state: { henry: 'The 8th' },
|
||||
status: 'running',
|
||||
taskType: 'bar',
|
||||
user: 'dabo',
|
||||
version: undefined,
|
||||
},
|
||||
],
|
||||
searchAfter: ['b', 2],
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('fetchAvailableTasks', () => {
|
||||
async function testFetchAvailableTasks({ opts = {}, hits = [] }: any = {}) {
|
||||
const callCluster = sinon.spy(async () => ({ hits: { hits } }));
|
||||
const store = new TaskStore({
|
||||
callCluster,
|
||||
supportedTypes: ['a', 'b', 'c'],
|
||||
index: 'tasky',
|
||||
maxAttempts: 2,
|
||||
...opts,
|
||||
});
|
||||
|
||||
const result = await store.fetchAvailableTasks();
|
||||
|
||||
sinon.assert.calledOnce(callCluster);
|
||||
sinon.assert.calledWith(callCluster, 'search');
|
||||
|
||||
return {
|
||||
result,
|
||||
args: callCluster.args[0][1],
|
||||
};
|
||||
}
|
||||
|
||||
test('it filters tasks by supported types, maxAttempts, and runAt', async () => {
|
||||
const maxAttempts = _.random(2, 43);
|
||||
const index = `index_${_.random(1, 234)}`;
|
||||
const { args } = await testFetchAvailableTasks({
|
||||
opts: {
|
||||
index,
|
||||
maxAttempts,
|
||||
supportedTypes: ['foo', 'bar'],
|
||||
},
|
||||
});
|
||||
expect(args).toMatchObject({
|
||||
body: {
|
||||
query: {
|
||||
bool: {
|
||||
must: [
|
||||
{ term: { type: 'task' } },
|
||||
{
|
||||
bool: {
|
||||
must: [
|
||||
{ terms: { 'task.taskType': ['foo', 'bar'] } },
|
||||
{ range: { 'task.attempts': { lte: maxAttempts } } },
|
||||
{ range: { 'task.runAt': { lte: 'now' } } },
|
||||
],
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
size: 10,
|
||||
sort: { 'task.runAt': { order: 'asc' } },
|
||||
version: true,
|
||||
},
|
||||
index,
|
||||
type: '_doc',
|
||||
});
|
||||
});
|
||||
|
||||
test('it returns task objects', async () => {
|
||||
const runAt = new Date();
|
||||
const { result } = await testFetchAvailableTasks({
|
||||
hits: [
|
||||
{
|
||||
_id: 'aaa',
|
||||
_source: {
|
||||
type: 'task',
|
||||
task: {
|
||||
runAt,
|
||||
taskType: 'foo',
|
||||
interval: undefined,
|
||||
attempts: 0,
|
||||
status: 'idle',
|
||||
params: '{ "hello": "world" }',
|
||||
state: '{ "baby": "Henhen" }',
|
||||
user: 'jimbo',
|
||||
scope: 'reporting',
|
||||
},
|
||||
},
|
||||
sort: ['a', 1],
|
||||
},
|
||||
{
|
||||
_id: 'bbb',
|
||||
_source: {
|
||||
type: 'task',
|
||||
task: {
|
||||
runAt,
|
||||
taskType: 'bar',
|
||||
interval: '5m',
|
||||
attempts: 2,
|
||||
status: 'running',
|
||||
params: '{ "shazm": 1 }',
|
||||
state: '{ "henry": "The 8th" }',
|
||||
user: 'dabo',
|
||||
scope: ['reporting', 'ceo'],
|
||||
},
|
||||
},
|
||||
sort: ['b', 2],
|
||||
},
|
||||
],
|
||||
});
|
||||
expect(result).toMatchObject([
|
||||
{
|
||||
attempts: 0,
|
||||
id: 'aaa',
|
||||
interval: undefined,
|
||||
params: { hello: 'world' },
|
||||
runAt,
|
||||
scope: 'reporting',
|
||||
state: { baby: 'Henhen' },
|
||||
status: 'idle',
|
||||
taskType: 'foo',
|
||||
user: 'jimbo',
|
||||
version: undefined,
|
||||
},
|
||||
{
|
||||
attempts: 2,
|
||||
id: 'bbb',
|
||||
interval: '5m',
|
||||
params: { shazm: 1 },
|
||||
runAt,
|
||||
scope: ['reporting', 'ceo'],
|
||||
state: { henry: 'The 8th' },
|
||||
status: 'running',
|
||||
taskType: 'bar',
|
||||
user: 'dabo',
|
||||
version: undefined,
|
||||
},
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('update', () => {
|
||||
test('refreshes the index, handles versioning', async () => {
|
||||
const runAt = new Date();
|
||||
const task = {
|
||||
runAt,
|
||||
id: 'task:324242',
|
||||
params: { hello: 'world' },
|
||||
state: { foo: 'bar' },
|
||||
taskType: 'report',
|
||||
version: 2,
|
||||
attempts: 3,
|
||||
status: 'idle' as TaskStatus,
|
||||
};
|
||||
|
||||
const callCluster = sinon.spy(async () => ({ _version: task.version + 1 }));
|
||||
const store = new TaskStore({
|
||||
callCluster,
|
||||
index: 'tasky',
|
||||
maxAttempts: 2,
|
||||
supportedTypes: ['a', 'b', 'c'],
|
||||
});
|
||||
|
||||
const result = await store.update(task);
|
||||
|
||||
sinon.assert.calledOnce(callCluster);
|
||||
sinon.assert.calledWith(callCluster, 'update');
|
||||
|
||||
expect(callCluster.args[0][1]).toMatchObject({
|
||||
id: task.id,
|
||||
index: 'tasky',
|
||||
type: '_doc',
|
||||
version: 2,
|
||||
refresh: true,
|
||||
body: {
|
||||
doc: {
|
||||
task: {
|
||||
...['id', 'version'].reduce((acc, prop) => _.omit(acc, prop), task),
|
||||
params: JSON.stringify(task.params),
|
||||
state: JSON.stringify(task.state),
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(result).toEqual({ ...task, version: 3 });
|
||||
});
|
||||
});
|
||||
|
||||
describe('remove', () => {
|
||||
test('removes the task with the specified id', async () => {
|
||||
const id = `id-${_.random(1, 20)}`;
|
||||
const callCluster = sinon.spy(() =>
|
||||
Promise.resolve({
|
||||
_index: 'myindex',
|
||||
_id: id,
|
||||
_version: 32,
|
||||
result: 'deleted',
|
||||
})
|
||||
);
|
||||
const store = new TaskStore({
|
||||
callCluster,
|
||||
index: 'myindex',
|
||||
maxAttempts: 2,
|
||||
supportedTypes: ['a'],
|
||||
});
|
||||
const result = await store.remove(id);
|
||||
|
||||
sinon.assert.calledOnce(callCluster);
|
||||
sinon.assert.calledWith(callCluster, 'delete');
|
||||
|
||||
expect(result).toEqual({
|
||||
id,
|
||||
index: 'myindex',
|
||||
version: 32,
|
||||
result: 'deleted',
|
||||
});
|
||||
|
||||
expect(callCluster.args[0][1]).toMatchObject({
|
||||
id,
|
||||
index: 'myindex',
|
||||
type: '_doc',
|
||||
refresh: true,
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
|
@ -1,359 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This module contains helpers for managing the task manager storage layer.
|
||||
*/
|
||||
|
||||
import { ConcreteTaskInstance, ElasticJs, TaskInstance, TaskStatus } from './task';
|
||||
|
||||
const DOC_TYPE = '_doc';
|
||||
|
||||
export interface StoreOpts {
|
||||
callCluster: ElasticJs;
|
||||
index: string;
|
||||
maxAttempts: number;
|
||||
supportedTypes: string[];
|
||||
}
|
||||
|
||||
export interface FetchOpts {
|
||||
searchAfter?: any[];
|
||||
sort?: object[];
|
||||
query?: object;
|
||||
}
|
||||
|
||||
export interface FetchResult {
|
||||
searchAfter: any[];
|
||||
docs: ConcreteTaskInstance[];
|
||||
}
|
||||
|
||||
export interface RemoveResult {
|
||||
index: string;
|
||||
id: string;
|
||||
version: string;
|
||||
result: string;
|
||||
}
|
||||
|
||||
// Internal, the raw document, as stored in the Kibana index.
|
||||
export interface RawTaskDoc {
|
||||
_id: string;
|
||||
_index: string;
|
||||
_type: string;
|
||||
_version: number;
|
||||
_source: {
|
||||
type: string;
|
||||
task: {
|
||||
taskType: string;
|
||||
runAt: Date;
|
||||
interval?: string;
|
||||
attempts: number;
|
||||
status: TaskStatus;
|
||||
params: string;
|
||||
state: string;
|
||||
user?: string;
|
||||
scope?: string | string[];
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps an elasticsearch connection and provides a task manager-specific
|
||||
* interface into the index.
|
||||
*/
|
||||
export class TaskStore {
|
||||
private callCluster: ElasticJs;
|
||||
private index: string;
|
||||
private maxAttempts: number;
|
||||
private supportedTypes: string[];
|
||||
|
||||
/**
|
||||
* Constructs a new TaskStore.
|
||||
* @param {StoreOpts} opts
|
||||
* @prop {CallCluster} callCluster - The elastic search connection
|
||||
* @prop {string} index - The name of the task manager index
|
||||
* @prop {number} maxAttempts - The maximum number of attempts before a task will be abandoned
|
||||
* @prop {string[]} supportedTypes - The task types supported by this store
|
||||
*/
|
||||
constructor(opts: StoreOpts) {
|
||||
this.callCluster = opts.callCluster;
|
||||
this.index = opts.index;
|
||||
this.maxAttempts = opts.maxAttempts;
|
||||
this.supportedTypes = opts.supportedTypes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the store, ensuring the task manager index is created and up to date.
|
||||
*/
|
||||
public async init() {
|
||||
const properties = {
|
||||
type: { type: 'keyword' },
|
||||
task: {
|
||||
properties: {
|
||||
taskType: { type: 'keyword' },
|
||||
runAt: { type: 'date' },
|
||||
interval: { type: 'text' },
|
||||
attempts: { type: 'integer' },
|
||||
status: { type: 'keyword' },
|
||||
params: { type: 'text' },
|
||||
state: { type: 'text' },
|
||||
user: { type: 'keyword' },
|
||||
scope: { type: 'keyword' },
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
try {
|
||||
await this.callCluster('indices.create', {
|
||||
index: this.index,
|
||||
body: {
|
||||
mappings: {
|
||||
_doc: {
|
||||
dynamic: 'strict',
|
||||
properties,
|
||||
},
|
||||
},
|
||||
settings: {
|
||||
number_of_shards: 1,
|
||||
auto_expand_replicas: '0-1',
|
||||
},
|
||||
},
|
||||
});
|
||||
} catch (err) {
|
||||
if (
|
||||
!err.body ||
|
||||
!err.body.error ||
|
||||
err.body.error.type !== 'resource_already_exists_exception'
|
||||
) {
|
||||
throw err;
|
||||
}
|
||||
return this.callCluster('indices.putMapping', {
|
||||
index: this.index,
|
||||
type: DOC_TYPE,
|
||||
body: {
|
||||
properties,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules a task.
|
||||
*
|
||||
* @param task - The task being scheduled.
|
||||
*/
|
||||
public async schedule(taskInstance: TaskInstance): Promise<ConcreteTaskInstance> {
|
||||
if (!this.supportedTypes.includes(taskInstance.taskType)) {
|
||||
throw new Error(`Unsupported task type "${taskInstance.taskType}".`);
|
||||
}
|
||||
|
||||
const { id, ...body } = rawSource(taskInstance);
|
||||
const result = await this.callCluster('index', {
|
||||
id,
|
||||
body,
|
||||
index: this.index,
|
||||
type: DOC_TYPE,
|
||||
refresh: true,
|
||||
});
|
||||
|
||||
const { task } = body;
|
||||
return {
|
||||
...taskInstance,
|
||||
id: result._id,
|
||||
version: result._version,
|
||||
attempts: 0,
|
||||
status: task.status,
|
||||
runAt: task.runAt,
|
||||
state: taskInstance.state || {},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches a paginatable list of scheduled tasks.
|
||||
*
|
||||
* @param opts - The query options used to filter tasks
|
||||
*/
|
||||
public async fetch(opts: FetchOpts = {}): Promise<FetchResult> {
|
||||
const sort = paginatableSort(opts.sort);
|
||||
return this.search({
|
||||
sort,
|
||||
search_after: opts.searchAfter,
|
||||
query: opts.query,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches tasks from the index, which are ready to be run.
|
||||
* - runAt is now or past
|
||||
* - id is not currently running in this instance of Kibana
|
||||
* - has a type that is in our task definitions
|
||||
*
|
||||
* @param {TaskQuery} query
|
||||
* @prop {string[]} types - Task types to be queried
|
||||
* @prop {number} size - The number of task instances to retrieve
|
||||
* @returns {Promise<ConcreteTaskInstance[]>}
|
||||
*/
|
||||
public fetchAvailableTasks = async () => {
|
||||
const { docs } = await this.search({
|
||||
query: {
|
||||
bool: {
|
||||
must: [
|
||||
{ terms: { 'task.taskType': this.supportedTypes } },
|
||||
{ range: { 'task.attempts': { lte: this.maxAttempts } } },
|
||||
{ range: { 'task.runAt': { lte: 'now' } } },
|
||||
],
|
||||
},
|
||||
},
|
||||
size: 10,
|
||||
sort: { 'task.runAt': { order: 'asc' } },
|
||||
version: true,
|
||||
});
|
||||
|
||||
return docs;
|
||||
};
|
||||
|
||||
/**
|
||||
* Updates the specified doc in the index, returning the doc
|
||||
* with its version up to date.
|
||||
*
|
||||
* @param {TaskDoc} doc
|
||||
* @returns {Promise<TaskDoc>}
|
||||
*/
|
||||
public async update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance> {
|
||||
const rawDoc = taskDocToRaw(doc, this.index);
|
||||
|
||||
const { _version } = await this.callCluster('update', {
|
||||
body: {
|
||||
doc: rawDoc._source,
|
||||
},
|
||||
id: doc.id,
|
||||
index: this.index,
|
||||
type: DOC_TYPE,
|
||||
version: doc.version,
|
||||
// The refresh is important so that if we immediately look for work,
|
||||
// we don't pick up this task.
|
||||
refresh: true,
|
||||
});
|
||||
|
||||
return {
|
||||
...doc,
|
||||
version: _version,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the specified task from the index.
|
||||
*
|
||||
* @param {string} id
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
public async remove(id: string): Promise<RemoveResult> {
|
||||
const result = await this.callCluster('delete', {
|
||||
id,
|
||||
index: this.index,
|
||||
type: DOC_TYPE,
|
||||
// The refresh is important so that if we immediately look for work,
|
||||
// we don't pick up this task.
|
||||
refresh: true,
|
||||
});
|
||||
|
||||
return {
|
||||
index: result._index,
|
||||
id: result._id,
|
||||
version: result._version,
|
||||
result: result.result,
|
||||
};
|
||||
}
|
||||
|
||||
private async search(opts: any = {}): Promise<FetchResult> {
|
||||
const originalQuery = opts.query;
|
||||
const queryOnlyTasks = { term: { type: 'task' } };
|
||||
const query = originalQuery
|
||||
? { bool: { must: [queryOnlyTasks, originalQuery] } }
|
||||
: queryOnlyTasks;
|
||||
|
||||
const result = await this.callCluster('search', {
|
||||
type: DOC_TYPE,
|
||||
index: this.index,
|
||||
body: {
|
||||
...opts,
|
||||
query,
|
||||
},
|
||||
});
|
||||
|
||||
const rawDocs = result.hits.hits;
|
||||
|
||||
return {
|
||||
docs: (rawDocs as RawTaskDoc[]).map(rawToTaskDoc),
|
||||
searchAfter: (rawDocs.length && rawDocs[rawDocs.length - 1].sort) || [],
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
function paginatableSort(sort: any[] = []) {
|
||||
const sortById = { _id: 'desc' };
|
||||
|
||||
if (!sort.length) {
|
||||
return [{ 'task.runAt': 'asc' }, sortById];
|
||||
}
|
||||
|
||||
if (sort.find(({ _id }) => !!_id)) {
|
||||
return sort;
|
||||
}
|
||||
|
||||
return [...sort, sortById];
|
||||
}
|
||||
|
||||
function rawSource(doc: ConcreteTaskInstance | TaskInstance) {
|
||||
const { id, ...taskFields } = doc;
|
||||
const source = {
|
||||
...taskFields,
|
||||
params: JSON.stringify(doc.params || {}),
|
||||
state: JSON.stringify(doc.state || {}),
|
||||
attempts: (doc as ConcreteTaskInstance).attempts || 0,
|
||||
runAt: doc.runAt || new Date(),
|
||||
status: (doc as ConcreteTaskInstance).status || 'idle',
|
||||
};
|
||||
|
||||
delete (source as any).id;
|
||||
delete (source as any).version;
|
||||
delete (source as any).type;
|
||||
|
||||
return {
|
||||
id,
|
||||
type: 'task',
|
||||
task: source,
|
||||
};
|
||||
}
|
||||
|
||||
function taskDocToRaw(doc: ConcreteTaskInstance, index: string): RawTaskDoc {
|
||||
const { type, task } = rawSource(doc);
|
||||
|
||||
return {
|
||||
_id: doc.id,
|
||||
_index: index,
|
||||
_source: { type, task },
|
||||
_type: DOC_TYPE,
|
||||
_version: doc.version,
|
||||
};
|
||||
}
|
||||
|
||||
function rawToTaskDoc(doc: RawTaskDoc): ConcreteTaskInstance {
|
||||
return {
|
||||
...doc._source.task,
|
||||
id: doc._id,
|
||||
version: doc._version,
|
||||
params: parseJSONField(doc._source.task.params, 'params', doc),
|
||||
state: parseJSONField(doc._source.task.state, 'state', doc),
|
||||
};
|
||||
}
|
||||
|
||||
function parseJSONField(json: string, fieldName: string, doc: RawTaskDoc) {
|
||||
try {
|
||||
return json ? JSON.parse(json) : {};
|
||||
} catch (error) {
|
||||
throw new Error(`Task "${doc._id}"'s ${fieldName} field has invalid JSON: ${json}`);
|
||||
}
|
||||
}
|
|
@ -1,52 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* A handful of helper functions for testing the task manager.
|
||||
*/
|
||||
|
||||
import sinon from 'sinon';
|
||||
|
||||
// Caching this here to avoid setTimeout mocking affecting our tests.
|
||||
const nativeTimeout = setTimeout;
|
||||
|
||||
/**
|
||||
* Creates a mock task manager Logger.
|
||||
*/
|
||||
export function mockLogger() {
|
||||
return {
|
||||
info: sinon.stub(),
|
||||
debug: sinon.stub(),
|
||||
warning: sinon.stub(),
|
||||
error: sinon.stub(),
|
||||
};
|
||||
}
|
||||
|
||||
interface Resolvable {
|
||||
resolve: () => void;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a promise which can be resolved externally, useful for
|
||||
* coordinating async tests.
|
||||
*/
|
||||
export function resolvable(): PromiseLike<void> & Resolvable {
|
||||
let resolve: () => void;
|
||||
const result = new Promise<void>(r => (resolve = r)) as any;
|
||||
|
||||
result.resolve = () => nativeTimeout(resolve, 0);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* A simple helper for waiting a specified number of milliseconds.
|
||||
*
|
||||
* @param {number} ms
|
||||
*/
|
||||
export async function sleep(ms: number) {
|
||||
return new Promise(r => nativeTimeout(r, ms));
|
||||
}
|
|
@ -12,7 +12,6 @@ require('@kbn/test').runTestsCli([
|
|||
require.resolve('../test/reporting/configs/phantom_functional.js'),
|
||||
require.resolve('../test/functional/config.js'),
|
||||
require.resolve('../test/api_integration/config.js'),
|
||||
require.resolve('../test/plugin_api_integration/config.js'),
|
||||
require.resolve('../test/saml_api_integration/config.js'),
|
||||
require.resolve('../test/spaces_api_integration/spaces_only/config'),
|
||||
require.resolve('../test/spaces_api_integration/security_and_spaces/config'),
|
||||
|
|
|
@ -1,46 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import path from 'path';
|
||||
import fs from 'fs';
|
||||
|
||||
export default async function ({ readConfigFile }) {
|
||||
const integrationConfig = await readConfigFile(require.resolve('../api_integration/config'));
|
||||
const kibanaFunctionalConfig = await readConfigFile(require.resolve('../../../test/functional/config.js'));
|
||||
|
||||
// Find all folders in ./plugins since we treat all them as plugin folder
|
||||
const allFiles = fs.readdirSync(path.resolve(__dirname, 'plugins'));
|
||||
const plugins = allFiles.filter(file => fs.statSync(path.resolve(__dirname, 'plugins', file)).isDirectory());
|
||||
|
||||
return {
|
||||
testFiles: [
|
||||
require.resolve('./test_suites/task_manager'),
|
||||
],
|
||||
services: {
|
||||
retry: kibanaFunctionalConfig.get('services.retry'),
|
||||
...integrationConfig.get('services'),
|
||||
},
|
||||
pageObjects: integrationConfig.get('pageObjects'),
|
||||
servers: integrationConfig.get('servers'),
|
||||
esTestCluster: integrationConfig.get('esTestCluster'),
|
||||
apps: integrationConfig.get('apps'),
|
||||
esArchiver: {
|
||||
directory: path.resolve(__dirname, '../es_archives')
|
||||
},
|
||||
screenshots: integrationConfig.get('screenshots'),
|
||||
junit: {
|
||||
reportName: 'Plugin Functional Tests',
|
||||
},
|
||||
kbnTestServer: {
|
||||
...integrationConfig.get('kbnTestServer'),
|
||||
serverArgs: [
|
||||
...integrationConfig.get('kbnTestServer.serverArgs'),
|
||||
...plugins.map(pluginDir => `--plugin-path=${path.resolve(__dirname, 'plugins', pluginDir)}`),
|
||||
],
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
@ -1,106 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { initRoutes } from './init_routes';
|
||||
|
||||
export default function (kibana) {
|
||||
return new kibana.Plugin({
|
||||
name: 'sampleTask',
|
||||
require: ['elasticsearch', 'task_manager'],
|
||||
|
||||
config(Joi) {
|
||||
return Joi.object({
|
||||
enabled: Joi.boolean().default(true),
|
||||
}).default();
|
||||
},
|
||||
|
||||
init(server) {
|
||||
const { taskManager } = server;
|
||||
|
||||
taskManager.registerTaskDefinitions({
|
||||
sampleTask: {
|
||||
title: 'Sample Task',
|
||||
description: 'A sample task for testing the task_manager.',
|
||||
timeOut: '1m',
|
||||
numWorkers: 2,
|
||||
|
||||
// This task allows tests to specify its behavior (whether it reschedules itself, whether it errors, etc)
|
||||
// taskInstance.params has the following optional fields:
|
||||
// nextRunMilliseconds: number - If specified, the run method will return a runAt that is now + nextRunMilliseconds
|
||||
// failWith: string - If specified, the task will throw an error with the specified message
|
||||
createTaskRunner: ({ kbnServer, taskInstance }) => ({
|
||||
async run() {
|
||||
const { params, state } = taskInstance;
|
||||
const prevState = state || { count: 0 };
|
||||
|
||||
if (params.failWith) {
|
||||
throw new Error(params.failWith);
|
||||
}
|
||||
|
||||
const callCluster = kbnServer.server.plugins.elasticsearch.getCluster('admin').callWithInternalUser;
|
||||
await callCluster('index', {
|
||||
index: '.task_manager_test_result',
|
||||
type: '_doc',
|
||||
body: {
|
||||
type: 'task',
|
||||
taskId: taskInstance.id,
|
||||
params: JSON.stringify(params),
|
||||
state: JSON.stringify(state),
|
||||
ranAt: new Date(),
|
||||
},
|
||||
refresh: true,
|
||||
});
|
||||
|
||||
return {
|
||||
state: { count: (prevState.count || 0) + 1 },
|
||||
runAt: millisecondsFromNow(params.nextRunMilliseconds),
|
||||
};
|
||||
},
|
||||
}),
|
||||
},
|
||||
});
|
||||
|
||||
taskManager.addMiddleware({
|
||||
async beforeSave({ taskInstance, ...opts }) {
|
||||
const modifiedInstance = {
|
||||
...taskInstance,
|
||||
params: {
|
||||
originalParams: taskInstance.params,
|
||||
superFly: 'My middleware param!',
|
||||
},
|
||||
};
|
||||
|
||||
return {
|
||||
...opts,
|
||||
taskInstance: modifiedInstance,
|
||||
};
|
||||
},
|
||||
|
||||
async beforeRun({ taskInstance, ...opts }) {
|
||||
return {
|
||||
...opts,
|
||||
taskInstance: {
|
||||
...taskInstance,
|
||||
params: taskInstance.params.originalParams,
|
||||
},
|
||||
};
|
||||
},
|
||||
});
|
||||
|
||||
initRoutes(server);
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function millisecondsFromNow(ms) {
|
||||
if (!ms) {
|
||||
return;
|
||||
}
|
||||
|
||||
const dt = new Date();
|
||||
dt.setTime(dt.getTime() + ms);
|
||||
return dt;
|
||||
}
|
|
@ -1,60 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import Joi from 'joi';
|
||||
|
||||
export function initRoutes(server) {
|
||||
const { taskManager } = server;
|
||||
|
||||
server.route({
|
||||
path: '/api/sample_tasks',
|
||||
method: 'POST',
|
||||
config: {
|
||||
validate: {
|
||||
payload: Joi.object({
|
||||
taskType: Joi.string().required(),
|
||||
interval: Joi.string().optional(),
|
||||
params: Joi.object().required(),
|
||||
state: Joi.object().optional(),
|
||||
id: Joi.string().optional(),
|
||||
}),
|
||||
},
|
||||
},
|
||||
async handler(request, reply) {
|
||||
try {
|
||||
const task = await taskManager.schedule(request.payload, { request });
|
||||
reply(task);
|
||||
} catch (err) {
|
||||
reply(err);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
server.route({
|
||||
path: '/api/sample_tasks',
|
||||
method: 'GET',
|
||||
async handler(_req, reply) {
|
||||
try {
|
||||
reply(taskManager.fetch());
|
||||
} catch (err) {
|
||||
reply(err);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
server.route({
|
||||
path: '/api/sample_tasks',
|
||||
method: 'DELETE',
|
||||
async handler(_req, reply) {
|
||||
try {
|
||||
const { docs: tasks } = await taskManager.fetch();
|
||||
reply(Promise.all(tasks.map((task) => taskManager.remove(task.id))));
|
||||
} catch (err) {
|
||||
reply(err);
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
|
@ -1,4 +0,0 @@
|
|||
{
|
||||
"name": "sample_task_plugin",
|
||||
"version": "kibana"
|
||||
}
|
|
@ -1,11 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
export default function ({ loadTestFile }) {
|
||||
describe('task_manager', () => {
|
||||
loadTestFile(require.resolve('./task_manager_integration'));
|
||||
});
|
||||
}
|
|
@ -1,163 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import _ from 'lodash';
|
||||
import expect from 'expect.js';
|
||||
import url from 'url';
|
||||
import supertestAsPromised from 'supertest-as-promised';
|
||||
|
||||
export default function ({ getService }) {
|
||||
const es = getService('es');
|
||||
const retry = getService('retry');
|
||||
const config = getService('config');
|
||||
const testHistoryIndex = '.task_manager_test_result';
|
||||
const supertest = supertestAsPromised(url.format(config.get('servers.kibana')));
|
||||
|
||||
describe('scheduling and running tasks', () => {
|
||||
beforeEach(() => supertest.delete('/api/sample_tasks')
|
||||
.set('kbn-xsrf', 'xxx')
|
||||
.expect(200));
|
||||
|
||||
beforeEach(async () =>
|
||||
(await es.indices.exists({ index: testHistoryIndex })) && es.deleteByQuery({
|
||||
index: testHistoryIndex,
|
||||
q: 'type:task',
|
||||
refresh: true,
|
||||
}));
|
||||
|
||||
function currentTasks() {
|
||||
return supertest.get('/api/sample_tasks')
|
||||
.expect(200)
|
||||
.then((response) => response.body);
|
||||
}
|
||||
|
||||
function historyDocs() {
|
||||
return es.search({
|
||||
index: testHistoryIndex,
|
||||
type: '_doc',
|
||||
q: 'type:task',
|
||||
}).then(result => result.hits.hits);
|
||||
}
|
||||
|
||||
function scheduleTask(task) {
|
||||
return supertest.post('/api/sample_tasks')
|
||||
.set('kbn-xsrf', 'xxx')
|
||||
.send(task)
|
||||
.expect(200)
|
||||
.then((response) => response.body);
|
||||
}
|
||||
|
||||
it('should support middleware', async () => {
|
||||
const historyItem = _.random(1, 100);
|
||||
|
||||
await scheduleTask({
|
||||
taskType: 'sampleTask',
|
||||
interval: '30m',
|
||||
params: { historyItem },
|
||||
});
|
||||
|
||||
await retry.try(async () => {
|
||||
expect((await historyDocs()).length).to.eql(1);
|
||||
|
||||
const [task] = (await currentTasks()).docs;
|
||||
|
||||
expect(task.attempts).to.eql(0);
|
||||
expect(task.state.count).to.eql(1);
|
||||
|
||||
expect(task.params).to.eql({
|
||||
superFly: 'My middleware param!',
|
||||
originalParams: { historyItem },
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('should remove non-recurring tasks after they complete', async () => {
|
||||
await scheduleTask({
|
||||
taskType: 'sampleTask',
|
||||
params: { },
|
||||
});
|
||||
|
||||
await retry.try(async () => {
|
||||
const history = await historyDocs();
|
||||
expect(history.length).to.eql(1);
|
||||
expect((await currentTasks()).docs).to.eql([]);
|
||||
});
|
||||
});
|
||||
|
||||
it('should use a given ID as the task document ID', async () => {
|
||||
const result = await scheduleTask({
|
||||
id: 'test-task-for-sample-task-plugin-to-test-task-manager',
|
||||
taskType: 'sampleTask',
|
||||
params: { },
|
||||
});
|
||||
|
||||
expect(result.id).to.be('test-task-for-sample-task-plugin-to-test-task-manager');
|
||||
});
|
||||
|
||||
it('should reschedule if task errors', async () => {
|
||||
const task = await scheduleTask({
|
||||
taskType: 'sampleTask',
|
||||
params: { failWith: 'Dangit!!!!!' },
|
||||
});
|
||||
|
||||
await retry.try(async () => {
|
||||
const [scheduledTask] = (await currentTasks()).docs;
|
||||
expect(scheduledTask.id).to.eql(task.id);
|
||||
expect(scheduledTask.attempts).to.be.greaterThan(0);
|
||||
expect(Date.parse(scheduledTask.runAt)).to.be.greaterThan(Date.parse(task.runAt));
|
||||
});
|
||||
});
|
||||
|
||||
it('should reschedule if task returns runAt', async () => {
|
||||
const nextRunMilliseconds = _.random(60000, 200000);
|
||||
const count = _.random(1, 20);
|
||||
|
||||
const originalTask = await scheduleTask({
|
||||
taskType: 'sampleTask',
|
||||
params: { nextRunMilliseconds },
|
||||
state: { count },
|
||||
});
|
||||
|
||||
await retry.try(async () => {
|
||||
expect((await historyDocs()).length).to.eql(1);
|
||||
|
||||
const [task] = (await currentTasks()).docs;
|
||||
expect(task.attempts).to.eql(0);
|
||||
expect(task.state.count).to.eql(count + 1);
|
||||
|
||||
expectReschedule(originalTask, task, nextRunMilliseconds);
|
||||
});
|
||||
});
|
||||
|
||||
it('should reschedule if task has an interval', async () => {
|
||||
const interval = _.random(5, 200);
|
||||
const intervalMilliseconds = interval * 60000;
|
||||
|
||||
const originalTask = await scheduleTask({
|
||||
taskType: 'sampleTask',
|
||||
interval: `${interval}m`,
|
||||
params: { },
|
||||
});
|
||||
|
||||
await retry.try(async () => {
|
||||
expect((await historyDocs()).length).to.eql(1);
|
||||
|
||||
const [task] = (await currentTasks()).docs;
|
||||
expect(task.attempts).to.eql(0);
|
||||
expect(task.state.count).to.eql(1);
|
||||
|
||||
expectReschedule(originalTask, task, intervalMilliseconds);
|
||||
});
|
||||
});
|
||||
|
||||
async function expectReschedule(originalTask, currentTask, expectedDiff) {
|
||||
const originalRunAt = Date.parse(originalTask.runAt);
|
||||
const buffer = 10000;
|
||||
expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.greaterThan(expectedDiff - buffer);
|
||||
expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.lessThan(expectedDiff + buffer);
|
||||
}
|
||||
});
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue