Task manager enhancements for error handling in alerting and actions (#39829) (#42004)

* Allow mtask definitions to overwrite default setting maxAttemps

* Leverage scheduledAt from task manager

* Treat maxAttempts like attempts and not retries

* Add support for second intervals

* Min 1 attempt

* Reverse relying on scheduledAt

* Add new startedAt attribute in task manager that keeps track when task started running

* Don't extend runAt when claiming a task

* Remove startedAt from state

* Attempt trying to define custom getBackpressureDelay function

* Pass error object to getBackpressureDelay

* Cleanup processResultForRecurringTask code

* Add backpressure to timed out tasks

* Change default timeout backpressure calculation

* getBackpressureDelay to return seconds instead of milliseconds

* Add comment for task store query

* Compress query

* Revert alert / actions specific code

* Add more interval tests

* Fix failing jest tests

* Fix test

* Add more unit tests

* Fix integration tests

* Fix sorting of tasks to process

* WIP

* Always provide error when getBackpressureDelay is called

* Rename getBackpressureDelay to getRetryDelay

* retryAt to be calculated from timeout time by default

* Remove invalid test

* Add unit tests

* Consider timeout before scheduling a retryAt

* Remove backpressure terminology

* Remove support for 0 based intervals and timeouts

* Apply PR feedback

* Fix last place using Math.abs

* Modify migrations to allow running a script when converting an index to an alias

* Convert task manager to use saved objects

* Fix broken test

* Fix broken tests pt1

* Remove index from task manager config schema

* Accept platform changes

* PR feedback

* Apply PR feedback

* Apply PR feedback pt2

* Apply PR feedback pt3

* Apply PR feedback pt4

* Fix feedback pt3

* Rename RawSavedObjectDoc to SavedObjectsRawDoc
This commit is contained in:
Mike Côté 2019-07-25 21:35:42 -04:00 committed by GitHub
parent 81f77df509
commit 44f8c129f7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
47 changed files with 1472 additions and 658 deletions

View file

@ -19,6 +19,8 @@ The plugin integrates with the core system via lifecycle events: `setup`<!-- -->
| [KibanaRequest](./kibana-plugin-server.kibanarequest.md) | Kibana specific abstraction for an incoming request. |
| [Router](./kibana-plugin-server.router.md) | |
| [SavedObjectsErrorHelpers](./kibana-plugin-server.savedobjectserrorhelpers.md) | |
| [SavedObjectsSchema](./kibana-plugin-server.savedobjectsschema.md) | |
| [SavedObjectsSerializer](./kibana-plugin-server.savedobjectsserializer.md) | |
| [ScopedClusterClient](./kibana-plugin-server.scopedclusterclient.md) | Serves the same purpose as "normal" <code>ClusterClient</code> but exposes additional <code>callAsCurrentUser</code> method that doesn't use credentials of the Kibana internal user (as <code>callAsInternalUser</code> does) to request Elasticsearch API, but rather passes HTTP headers extracted from the current user request to the API |
## Interfaces
@ -60,6 +62,7 @@ The plugin integrates with the core system via lifecycle events: `setup`<!-- -->
| [SavedObjectsFindOptions](./kibana-plugin-server.savedobjectsfindoptions.md) | |
| [SavedObjectsFindResponse](./kibana-plugin-server.savedobjectsfindresponse.md) | |
| [SavedObjectsMigrationVersion](./kibana-plugin-server.savedobjectsmigrationversion.md) | A dictionary of saved object type -<!-- -->&gt; version used to determine what migrations need to be applied to a saved object. |
| [SavedObjectsRawDoc](./kibana-plugin-server.savedobjectsrawdoc.md) | A raw document as represented directly in the saved object index. |
| [SavedObjectsService](./kibana-plugin-server.savedobjectsservice.md) | |
| [SavedObjectsUpdateOptions](./kibana-plugin-server.savedobjectsupdateoptions.md) | |
| [SavedObjectsUpdateResponse](./kibana-plugin-server.savedobjectsupdateresponse.md) | |

View file

@ -0,0 +1,11 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-server](./kibana-plugin-server.md) &gt; [SavedObjectsRawDoc](./kibana-plugin-server.savedobjectsrawdoc.md) &gt; [\_id](./kibana-plugin-server.savedobjectsrawdoc._id.md)
## SavedObjectsRawDoc.\_id property
<b>Signature:</b>
```typescript
_id: string;
```

View file

@ -0,0 +1,11 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-server](./kibana-plugin-server.md) &gt; [SavedObjectsRawDoc](./kibana-plugin-server.savedobjectsrawdoc.md) &gt; [\_primary\_term](./kibana-plugin-server.savedobjectsrawdoc._primary_term.md)
## SavedObjectsRawDoc.\_primary\_term property
<b>Signature:</b>
```typescript
_primary_term?: number;
```

View file

@ -0,0 +1,11 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-server](./kibana-plugin-server.md) &gt; [SavedObjectsRawDoc](./kibana-plugin-server.savedobjectsrawdoc.md) &gt; [\_seq\_no](./kibana-plugin-server.savedobjectsrawdoc._seq_no.md)
## SavedObjectsRawDoc.\_seq\_no property
<b>Signature:</b>
```typescript
_seq_no?: number;
```

View file

@ -0,0 +1,11 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-server](./kibana-plugin-server.md) &gt; [SavedObjectsRawDoc](./kibana-plugin-server.savedobjectsrawdoc.md) &gt; [\_source](./kibana-plugin-server.savedobjectsrawdoc._source.md)
## SavedObjectsRawDoc.\_source property
<b>Signature:</b>
```typescript
_source: any;
```

View file

@ -0,0 +1,11 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-server](./kibana-plugin-server.md) &gt; [SavedObjectsRawDoc](./kibana-plugin-server.savedobjectsrawdoc.md) &gt; [\_type](./kibana-plugin-server.savedobjectsrawdoc._type.md)
## SavedObjectsRawDoc.\_type property
<b>Signature:</b>
```typescript
_type?: string;
```

View file

@ -0,0 +1,24 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-server](./kibana-plugin-server.md) &gt; [SavedObjectsRawDoc](./kibana-plugin-server.savedobjectsrawdoc.md)
## SavedObjectsRawDoc interface
A raw document as represented directly in the saved object index.
<b>Signature:</b>
```typescript
export interface RawDoc
```
## Properties
| Property | Type | Description |
| --- | --- | --- |
| [\_id](./kibana-plugin-server.savedobjectsrawdoc._id.md) | <code>string</code> | |
| [\_primary\_term](./kibana-plugin-server.savedobjectsrawdoc._primary_term.md) | <code>number</code> | |
| [\_seq\_no](./kibana-plugin-server.savedobjectsrawdoc._seq_no.md) | <code>number</code> | |
| [\_source](./kibana-plugin-server.savedobjectsrawdoc._source.md) | <code>any</code> | |
| [\_type](./kibana-plugin-server.savedobjectsrawdoc._type.md) | <code>string</code> | |

View file

@ -0,0 +1,20 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-server](./kibana-plugin-server.md) &gt; [SavedObjectsSchema](./kibana-plugin-server.savedobjectsschema.md) &gt; [(constructor)](./kibana-plugin-server.savedobjectsschema.(constructor).md)
## SavedObjectsSchema.(constructor)
Constructs a new instance of the `SavedObjectsSchema` class
<b>Signature:</b>
```typescript
constructor(schemaDefinition?: SavedObjectsSchemaDefinition);
```
## Parameters
| Parameter | Type | Description |
| --- | --- | --- |
| schemaDefinition | <code>SavedObjectsSchemaDefinition</code> | |

View file

@ -0,0 +1,22 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-server](./kibana-plugin-server.md) &gt; [SavedObjectsSchema](./kibana-plugin-server.savedobjectsschema.md) &gt; [getIndexForType](./kibana-plugin-server.savedobjectsschema.getindexfortype.md)
## SavedObjectsSchema.getIndexForType() method
<b>Signature:</b>
```typescript
getIndexForType(type: string): string | undefined;
```
## Parameters
| Parameter | Type | Description |
| --- | --- | --- |
| type | <code>string</code> | |
<b>Returns:</b>
`string | undefined`

View file

@ -0,0 +1,22 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-server](./kibana-plugin-server.md) &gt; [SavedObjectsSchema](./kibana-plugin-server.savedobjectsschema.md) &gt; [isHiddenType](./kibana-plugin-server.savedobjectsschema.ishiddentype.md)
## SavedObjectsSchema.isHiddenType() method
<b>Signature:</b>
```typescript
isHiddenType(type: string): boolean;
```
## Parameters
| Parameter | Type | Description |
| --- | --- | --- |
| type | <code>string</code> | |
<b>Returns:</b>
`boolean`

View file

@ -0,0 +1,22 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-server](./kibana-plugin-server.md) &gt; [SavedObjectsSchema](./kibana-plugin-server.savedobjectsschema.md) &gt; [isNamespaceAgnostic](./kibana-plugin-server.savedobjectsschema.isnamespaceagnostic.md)
## SavedObjectsSchema.isNamespaceAgnostic() method
<b>Signature:</b>
```typescript
isNamespaceAgnostic(type: string): boolean;
```
## Parameters
| Parameter | Type | Description |
| --- | --- | --- |
| type | <code>string</code> | |
<b>Returns:</b>
`boolean`

View file

@ -0,0 +1,26 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-server](./kibana-plugin-server.md) &gt; [SavedObjectsSchema](./kibana-plugin-server.savedobjectsschema.md)
## SavedObjectsSchema class
<b>Signature:</b>
```typescript
export declare class SavedObjectsSchema
```
## Constructors
| Constructor | Modifiers | Description |
| --- | --- | --- |
| [(constructor)(schemaDefinition)](./kibana-plugin-server.savedobjectsschema.(constructor).md) | | Constructs a new instance of the <code>SavedObjectsSchema</code> class |
## Methods
| Method | Modifiers | Description |
| --- | --- | --- |
| [getIndexForType(type)](./kibana-plugin-server.savedobjectsschema.getindexfortype.md) | | |
| [isHiddenType(type)](./kibana-plugin-server.savedobjectsschema.ishiddentype.md) | | |
| [isNamespaceAgnostic(type)](./kibana-plugin-server.savedobjectsschema.isnamespaceagnostic.md) | | |

View file

@ -0,0 +1,20 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-server](./kibana-plugin-server.md) &gt; [SavedObjectsSerializer](./kibana-plugin-server.savedobjectsserializer.md) &gt; [(constructor)](./kibana-plugin-server.savedobjectsserializer.(constructor).md)
## SavedObjectsSerializer.(constructor)
Constructs a new instance of the `SavedObjectsSerializer` class
<b>Signature:</b>
```typescript
constructor(schema: SavedObjectsSchema);
```
## Parameters
| Parameter | Type | Description |
| --- | --- | --- |
| schema | <code>SavedObjectsSchema</code> | |

View file

@ -0,0 +1,26 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-server](./kibana-plugin-server.md) &gt; [SavedObjectsSerializer](./kibana-plugin-server.savedobjectsserializer.md) &gt; [generateRawId](./kibana-plugin-server.savedobjectsserializer.generaterawid.md)
## SavedObjectsSerializer.generateRawId() method
Given a saved object type and id, generates the compound id that is stored in the raw document.
<b>Signature:</b>
```typescript
generateRawId(namespace: string | undefined, type: string, id?: string): string;
```
## Parameters
| Parameter | Type | Description |
| --- | --- | --- |
| namespace | <code>string &#124; undefined</code> | |
| type | <code>string</code> | |
| id | <code>string</code> | |
<b>Returns:</b>
`string`

View file

@ -0,0 +1,24 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-server](./kibana-plugin-server.md) &gt; [SavedObjectsSerializer](./kibana-plugin-server.savedobjectsserializer.md) &gt; [isRawSavedObject](./kibana-plugin-server.savedobjectsserializer.israwsavedobject.md)
## SavedObjectsSerializer.isRawSavedObject() method
Determines whether or not the raw document can be converted to a saved object.
<b>Signature:</b>
```typescript
isRawSavedObject(rawDoc: RawDoc): any;
```
## Parameters
| Parameter | Type | Description |
| --- | --- | --- |
| rawDoc | <code>RawDoc</code> | |
<b>Returns:</b>
`any`

View file

@ -0,0 +1,27 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-server](./kibana-plugin-server.md) &gt; [SavedObjectsSerializer](./kibana-plugin-server.savedobjectsserializer.md)
## SavedObjectsSerializer class
<b>Signature:</b>
```typescript
export declare class SavedObjectsSerializer
```
## Constructors
| Constructor | Modifiers | Description |
| --- | --- | --- |
| [(constructor)(schema)](./kibana-plugin-server.savedobjectsserializer.(constructor).md) | | Constructs a new instance of the <code>SavedObjectsSerializer</code> class |
## Methods
| Method | Modifiers | Description |
| --- | --- | --- |
| [generateRawId(namespace, type, id)](./kibana-plugin-server.savedobjectsserializer.generaterawid.md) | | Given a saved object type and id, generates the compound id that is stored in the raw document. |
| [isRawSavedObject(rawDoc)](./kibana-plugin-server.savedobjectsserializer.israwsavedobject.md) | | Determines whether or not the raw document can be converted to a saved object. |
| [rawToSavedObject(doc)](./kibana-plugin-server.savedobjectsserializer.rawtosavedobject.md) | | Converts a document from the format that is stored in elasticsearch to the saved object client format. |
| [savedObjectToRaw(savedObj)](./kibana-plugin-server.savedobjectsserializer.savedobjecttoraw.md) | | Converts a document from the saved object client format to the format that is stored in elasticsearch. |

View file

@ -0,0 +1,24 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-server](./kibana-plugin-server.md) &gt; [SavedObjectsSerializer](./kibana-plugin-server.savedobjectsserializer.md) &gt; [rawToSavedObject](./kibana-plugin-server.savedobjectsserializer.rawtosavedobject.md)
## SavedObjectsSerializer.rawToSavedObject() method
Converts a document from the format that is stored in elasticsearch to the saved object client format.
<b>Signature:</b>
```typescript
rawToSavedObject(doc: RawDoc): SanitizedSavedObjectDoc;
```
## Parameters
| Parameter | Type | Description |
| --- | --- | --- |
| doc | <code>RawDoc</code> | |
<b>Returns:</b>
`SanitizedSavedObjectDoc`

View file

@ -0,0 +1,24 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-server](./kibana-plugin-server.md) &gt; [SavedObjectsSerializer](./kibana-plugin-server.savedobjectsserializer.md) &gt; [savedObjectToRaw](./kibana-plugin-server.savedobjectsserializer.savedobjecttoraw.md)
## SavedObjectsSerializer.savedObjectToRaw() method
Converts a document from the saved object client format to the format that is stored in elasticsearch.
<b>Signature:</b>
```typescript
savedObjectToRaw(savedObj: SanitizedSavedObjectDoc): RawDoc;
```
## Parameters
| Parameter | Type | Description |
| --- | --- | --- |
| savedObj | <code>SanitizedSavedObjectDoc</code> | |
<b>Returns:</b>
`RawDoc`

View file

@ -103,6 +103,9 @@ export {
SavedObjectsFindOptions,
SavedObjectsFindResponse,
SavedObjectsMigrationVersion,
SavedObjectsRawDoc,
SavedObjectsSchema,
SavedObjectsSerializer,
SavedObjectsService,
SavedObjectsUpdateOptions,
SavedObjectsUpdateResponse,

View file

@ -22,3 +22,5 @@ export * from './service';
export { SavedObjectsSchema } from './schema';
export { SavedObjectsManagement } from './management';
export { SavedObjectsSerializer, RawDoc as SavedObjectsRawDoc } from './serialization';

View file

@ -634,6 +634,48 @@ export interface SavedObjectsMigrationVersion {
[pluginName: string]: string;
}
// Warning: (ae-missing-release-tag) "RawDoc" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public
export interface SavedObjectsRawDoc {
// (undocumented)
_id: string;
// (undocumented)
_primary_term?: number;
// (undocumented)
_seq_no?: number;
// (undocumented)
_source: any;
// (undocumented)
_type?: string;
}
// Warning: (ae-missing-release-tag) "SavedObjectsSchema" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public (undocumented)
export class SavedObjectsSchema {
// Warning: (ae-forgotten-export) The symbol "SavedObjectsSchemaDefinition" needs to be exported by the entry point index.d.ts
constructor(schemaDefinition?: SavedObjectsSchemaDefinition);
// (undocumented)
getIndexForType(type: string): string | undefined;
// (undocumented)
isHiddenType(type: string): boolean;
// (undocumented)
isNamespaceAgnostic(type: string): boolean;
}
// Warning: (ae-missing-release-tag) "SavedObjectsSerializer" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public (undocumented)
export class SavedObjectsSerializer {
constructor(schema: SavedObjectsSchema);
generateRawId(namespace: string | undefined, type: string, id?: string): string;
isRawSavedObject(rawDoc: SavedObjectsRawDoc): any;
// Warning: (ae-forgotten-export) The symbol "SanitizedSavedObjectDoc" needs to be exported by the entry point index.d.ts
rawToSavedObject(doc: SavedObjectsRawDoc): SanitizedSavedObjectDoc;
savedObjectToRaw(savedObj: SanitizedSavedObjectDoc): SavedObjectsRawDoc;
}
// @public (undocumented)
export interface SavedObjectsService<Request = any> {
// Warning: (ae-forgotten-export) The symbol "ScopedSavedObjectsClientProvider" needs to be exported by the entry point index.d.ts

View file

@ -94,12 +94,12 @@ describe('create()', () => {
taskManager.schedule.mockResolvedValueOnce({
id: 'task-123',
taskType: 'alerting:123',
sequenceNumber: 1,
primaryTerm: 1,
scheduledAt: new Date(),
attempts: 1,
status: 'idle',
runAt: new Date(),
startedAt: null,
retryAt: null,
state: {},
params: {},
});
@ -437,8 +437,6 @@ describe('enable()', () => {
});
taskManager.schedule.mockResolvedValueOnce({
id: 'task-123',
sequenceNumber: 1,
primaryTerm: 1,
scheduledAt: new Date(),
attempts: 0,
status: 'idle',
@ -446,6 +444,8 @@ describe('enable()', () => {
state: {},
params: {},
taskType: '',
startedAt: null,
retryAt: null,
});
await alertsClient.enable({ id: '1' });
@ -737,19 +737,8 @@ describe('delete()', () => {
savedObjectsClient.delete.mockResolvedValueOnce({
success: true,
});
taskManager.remove.mockResolvedValueOnce({
index: '.task_manager',
id: 'task-123',
sequenceNumber: 1,
primaryTerm: 1,
result: '',
});
const result = await alertsClient.delete({ id: '1' });
expect(result).toMatchInlineSnapshot(`
Object {
"success": true,
}
`);
expect(result).toEqual({ success: true });
expect(savedObjectsClient.delete).toHaveBeenCalledTimes(1);
expect(savedObjectsClient.delete.mock.calls[0]).toMatchInlineSnapshot(`
Array [

View file

@ -13,16 +13,24 @@ export const TASK_ID = `Maps-${TELEMETRY_TASK_TYPE}`;
export function scheduleTask(server, taskManager) {
const { kbnServer } = server.plugins.xpack_main.status.plugin;
kbnServer.afterPluginsInit(async () => {
try {
await taskManager.schedule({
id: TASK_ID,
taskType: TELEMETRY_TASK_TYPE,
state: { stats: {}, runs: 0 },
});
}catch(e) {
server.log(['warning', 'maps'], `Error scheduling telemetry task, received ${e.message}`);
}
kbnServer.afterPluginsInit(() => {
// The code block below can't await directly within "afterPluginsInit"
// callback due to circular dependency. The server isn't "ready" until
// this code block finishes. Migrations wait for server to be ready before
// executing. Saved objects repository waits for migrations to finish before
// finishing the request. To avoid this, we'll await within a separate
// function block.
(async () => {
try {
await taskManager.schedule({
id: TASK_ID,
taskType: TELEMETRY_TASK_TYPE,
state: { stats: {}, runs: 0 },
});
}catch(e) {
server.log(['warning', 'maps'], `Error scheduling telemetry task, received ${e.message}`);
}
})();
});
}
@ -73,5 +81,5 @@ export function getNextMidnight() {
const nextMidnight = new Date();
nextMidnight.setHours(0, 0, 0, 0);
nextMidnight.setDate(nextMidnight.getDate() + 1);
return nextMidnight.toISOString();
return nextMidnight;
}

View file

@ -25,7 +25,7 @@ describe('telemetryTaskRunner', () => {
moment()
.add(1, 'days')
.startOf('day')
.toISOString();
.toDate();
const getRunner = telemetryTaskRunner();
const runResult = await getRunner(

View file

@ -12,7 +12,7 @@ describe('getNextMidnight', () => {
const nextMidnightMoment = moment()
.add(1, 'days')
.startOf('day')
.toISOString();
.toDate();
expect(getNextMidnight()).toEqual(nextMidnightMoment);
});

View file

@ -8,5 +8,5 @@ export function getNextMidnight() {
const nextMidnight = new Date();
nextMidnight.setHours(0, 0, 0, 0);
nextMidnight.setDate(nextMidnight.getDate() + 1);
return nextMidnight.toISOString();
return nextMidnight;
}

View file

@ -29,15 +29,23 @@ export function scheduleTasks(server: HapiServer) {
const { taskManager } = server;
const { kbnServer } = server.plugins.xpack_main.status.plugin;
kbnServer.afterPluginsInit(async () => {
try {
await taskManager.schedule({
id: `${PLUGIN_ID}-${VIS_TELEMETRY_TASK}`,
taskType: VIS_TELEMETRY_TASK,
state: { stats: {}, runs: 0 },
});
} catch (e) {
server.log(['warning', 'telemetry'], `Error scheduling task, received ${e.message}`);
}
kbnServer.afterPluginsInit(() => {
// The code block below can't await directly within "afterPluginsInit"
// callback due to circular dependency. The server isn't "ready" until
// this code block finishes. Migrations wait for server to be ready before
// executing. Saved objects repository waits for migrations to finish before
// finishing the request. To avoid this, we'll await within a separate
// function block.
(async () => {
try {
await taskManager.schedule({
id: `${PLUGIN_ID}-${VIS_TELEMETRY_TASK}`,
taskType: VIS_TELEMETRY_TASK,
state: { stats: {}, runs: 0 },
});
} catch (e) {
server.log(['warning', 'telemetry'], `Error scheduling task, received ${e.message}`);
}
})();
});
}

View file

@ -43,7 +43,7 @@ describe('visualizationsTaskRunner', () => {
moment()
.add(1, 'days')
.startOf('day')
.toISOString();
.toDate();
const runner = visualizationsTaskRunner(mockTaskInstance, { server: mockKbnServer });
const result = await runner();

View file

@ -19,7 +19,8 @@ At a high-level, the task manager works like this:
- `attempts` is less than the configured threshold
- Attempt to claim the task by using optimistic concurrency to set:
- status to `running`
- `runAt` to now + the timeout specified by the task
- `startedAt` to now
- `retryAt` to next time task should retry if it times out and is still in `running` status
- Execute the task, if the previous claim succeeded
- If the task fails, increment the `attempts` count and reschedule it
- If the task succeeds:
@ -38,7 +39,7 @@ If a task specifies a higher `numWorkers` than the system supports, the system's
The task_manager can be configured via `taskManager` config options (e.g. `taskManager.maxAttempts`):
- `max_attempts` - How many times a failing task instance will be retried before it is never run again
- `max_attempts` - The maximum number of times a task will be attempted before being abandoned as failed
- `poll_interval` - How often the background worker should check the task_manager index for more work
- `index` - The name of the index that the task_manager
- `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10)
@ -64,11 +65,15 @@ taskManager.registerTaskDefinitions({
// Optional, human-friendly, more detailed description
description: 'Amazing!!',
// Optional, how long, in minutes, the system should wait before
// Optional, how long, in minutes or seconds, the system should wait before
// a running instance of this task is considered to be timed out.
// This defaults to 5 minutes.
timeout: '5m',
// Optional, how many attempts before marking task as failed.
// This defaults to what is configured at the task manager level.
maxAttempts: 5,
// The clusterMonitoring task occupies 2 workers, so if the system has 10 worker slots,
// 5 clusterMonitoring tasks could run concurrently per Kibana instance. This value is
// overridden by the `override_num_workers` config value, if specified.
@ -161,7 +166,7 @@ The data stored for a task instance looks something like this:
runAt: "2020-07-24T17:34:35.272Z",
// Indicates that this is a recurring task. We currently only support
// 1 minute granularity.
// minute syntax `5m` or second syntax `10s`.
interval: '5m',
// How many times this task has been unsuccesfully attempted,

View file

@ -4,8 +4,4 @@
* you may not use this file except in compliance with the Elastic License.
*/
import xPackage from '../../../package.json';
import { getTemplateVersion } from './lib/get_template_version';
export const TASK_MANAGER_API_VERSION = 1;
export const TASK_MANAGER_TEMPLATE_VERSION = getTemplateVersion(xPackage.version);
export const TASK_MANAGER_INDEX = '.kibana_task_manager';

View file

@ -4,7 +4,11 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { SavedObjectsSerializer, SavedObjectsSchema } from '../../../../src/core/server';
import { TaskManager } from './task_manager';
import mappings from './mappings.json';
import { migrations } from './migrations';
import { TASK_MANAGER_INDEX } from './constants';
export function taskManager(kibana) {
return new kibana.Plugin({
@ -16,15 +20,12 @@ export function taskManager(kibana) {
enabled: Joi.boolean().default(true),
max_attempts: Joi.number()
.description('The maximum number of times a task will be attempted before being abandoned as failed')
.min(0) // no retries
.min(1)
.default(3),
poll_interval: Joi.number()
.description('How often, in milliseconds, the task manager will look for more work.')
.min(1000)
.default(3000),
index: Joi.string()
.description('The name of the index used to store task information.')
.default('.kibana_task_manager'),
max_workers: Joi.number()
.description('The maximum number of tasks that this Kibana instance will run simultaneously.')
.min(1) // disable the task manager rather than trying to specify it with 0 workers
@ -37,8 +38,33 @@ export function taskManager(kibana) {
},
init(server) {
const config = server.config();
const taskManager = new TaskManager(this.kbnServer, server, config);
const schema = new SavedObjectsSchema(this.kbnServer.uiExports.savedObjectSchemas);
const serializer = new SavedObjectsSerializer(schema);
const { callWithInternalUser } = server.plugins.elasticsearch.getCluster('admin');
const savedObjectsRepository = server.savedObjects.getSavedObjectsRepository(
callWithInternalUser,
['task']
);
const taskManager = new TaskManager({
kbnServer: this.kbnServer,
config,
savedObjectsRepository,
serializer,
});
server.decorate('server', 'taskManager', taskManager);
},
uiExports: {
mappings,
migrations,
savedObjectSchemas: {
task: {
hidden: true,
isNamespaceAgnostic: true,
indexPattern: TASK_MANAGER_INDEX,
convertToAliasScript: `ctx._id = ctx._source.type + ':' + ctx._id`,
},
},
},
});
}

View file

@ -5,7 +5,24 @@
*/
import _ from 'lodash';
import { assertValidInterval, intervalFromNow, minutesFromNow } from './intervals';
import sinon from 'sinon';
import {
assertValidInterval,
intervalFromNow,
intervalFromDate,
minutesFromNow,
minutesFromDate,
secondsFromNow,
secondsFromDate,
} from './intervals';
let fakeTimer: sinon.SinonFakeTimers;
beforeAll(() => {
fakeTimer = sinon.useFakeTimers();
});
afterAll(() => fakeTimer.restore());
describe('taskIntervals', () => {
describe('assertValidInterval', () => {
@ -13,7 +30,20 @@ describe('taskIntervals', () => {
expect(() => assertValidInterval(`${_.random(1000)}m`)).not.toThrow();
});
test('it rejects intervals are not of the form `Nm`', () => {
test('it accepts intervals in the form `Ns`', () => {
expect(() => assertValidInterval(`${_.random(1000)}s`)).not.toThrow();
});
test('it rejects 0 based intervals', () => {
expect(() => assertValidInterval('0m')).toThrow(
/Invalid interval "0m"\. Intervals must be of the form {number}m. Example: 5m/
);
expect(() => assertValidInterval('0s')).toThrow(
/Invalid interval "0s"\. Intervals must be of the form {number}m. Example: 5m/
);
});
test('it rejects intervals are not of the form `Nm` or `Ns`', () => {
expect(() => assertValidInterval(`5m 2s`)).toThrow(
/Invalid interval "5m 2s"\. Intervals must be of the form {number}m. Example: 5m/
);
@ -28,10 +58,17 @@ describe('taskIntervals', () => {
const mins = _.random(1, 100);
const expected = Date.now() + mins * 60 * 1000;
const nextRun = intervalFromNow(`${mins}m`)!.getTime();
expect(Math.abs(nextRun - expected)).toBeLessThan(100);
expect(nextRun).toEqual(expected);
});
test('it rejects intervals are not of the form `Nm`', () => {
test('it returns the current date plus n seconds', () => {
const secs = _.random(1, 100);
const expected = Date.now() + secs * 1000;
const nextRun = intervalFromNow(`${secs}s`)!.getTime();
expect(nextRun).toEqual(expected);
});
test('it rejects intervals are not of the form `Nm` or `Ns`', () => {
expect(() => intervalFromNow(`5m 2s`)).toThrow(
/Invalid interval "5m 2s"\. Intervals must be of the form {number}m. Example: 5m/
);
@ -39,6 +76,53 @@ describe('taskIntervals', () => {
/Invalid interval "hello"\. Intervals must be of the form {number}m. Example: 5m/
);
});
test('it rejects 0 based intervals', () => {
expect(() => intervalFromNow('0m')).toThrow(
/Invalid interval "0m"\. Intervals must be of the form {number}m. Example: 5m/
);
expect(() => intervalFromNow('0s')).toThrow(
/Invalid interval "0s"\. Intervals must be of the form {number}m. Example: 5m/
);
});
});
describe('intervalFromDate', () => {
test('it returns the given date plus n minutes', () => {
const originalDate = new Date(2019, 1, 1);
const mins = _.random(1, 100);
const expected = originalDate.valueOf() + mins * 60 * 1000;
const nextRun = intervalFromDate(originalDate, `${mins}m`)!.getTime();
expect(expected).toEqual(nextRun);
});
test('it returns the current date plus n seconds', () => {
const originalDate = new Date(2019, 1, 1);
const secs = _.random(1, 100);
const expected = originalDate.valueOf() + secs * 1000;
const nextRun = intervalFromDate(originalDate, `${secs}s`)!.getTime();
expect(expected).toEqual(nextRun);
});
test('it rejects intervals are not of the form `Nm` or `Ns`', () => {
const date = new Date();
expect(() => intervalFromDate(date, `5m 2s`)).toThrow(
/Invalid interval "5m 2s"\. Intervals must be of the form {number}m. Example: 5m/
);
expect(() => intervalFromDate(date, `hello`)).toThrow(
/Invalid interval "hello"\. Intervals must be of the form {number}m. Example: 5m/
);
});
test('it rejects 0 based intervals', () => {
const date = new Date();
expect(() => intervalFromDate(date, '0m')).toThrow(
/Invalid interval "0m"\. Intervals must be of the form {number}m. Example: 5m/
);
expect(() => intervalFromDate(date, '0s')).toThrow(
/Invalid interval "0s"\. Intervals must be of the form {number}m. Example: 5m/
);
});
});
describe('minutesFromNow', () => {
@ -46,7 +130,36 @@ describe('taskIntervals', () => {
const mins = _.random(1, 100);
const expected = Date.now() + mins * 60 * 1000;
const nextRun = minutesFromNow(mins).getTime();
expect(Math.abs(nextRun - expected)).toBeLessThan(100);
expect(nextRun).toEqual(expected);
});
});
describe('minutesFromDate', () => {
test('it returns the given date plus a number of minutes', () => {
const originalDate = new Date(2019, 1, 1);
const mins = _.random(1, 100);
const expected = originalDate.valueOf() + mins * 60 * 1000;
const nextRun = minutesFromDate(originalDate, mins).getTime();
expect(expected).toEqual(nextRun);
});
});
describe('secondsFromNow', () => {
test('it returns the current date plus a number of seconds', () => {
const secs = _.random(1, 100);
const expected = Date.now() + secs * 1000;
const nextRun = secondsFromNow(secs).getTime();
expect(nextRun).toEqual(expected);
});
});
describe('secondsFromDate', () => {
test('it returns the given date plus a number of seconds', () => {
const originalDate = new Date(2019, 1, 1);
const secs = _.random(1, 100);
const expected = originalDate.valueOf() + secs * 1000;
const nextRun = secondsFromDate(originalDate, secs).getTime();
expect(expected).toEqual(nextRun);
});
});
});

View file

@ -6,7 +6,7 @@
/**
* Returns a date that is the specified interval from now. Currently,
* only minute-intervals are supported.
* only minute-intervals and second-intervals are supported.
*
* @param {string} interval - An interval of the form `Nm` such as `5m`
*/
@ -17,29 +17,91 @@ export function intervalFromNow(interval?: string): Date | undefined {
assertValidInterval(interval);
if (isSeconds(interval)) {
return secondsFromNow(parseInterval(interval));
}
return minutesFromNow(parseInterval(interval));
}
/**
* Returns a date that is the specified interval from given date. Currently,
* only minute-intervals and second-intervals are supported.
*
* @param {Date} date - The date to add interval to
* @param {string} interval - An interval of the form `Nm` such as `5m`
*/
export function intervalFromDate(date: Date, interval?: string): Date | undefined {
if (interval === undefined) {
return;
}
assertValidInterval(interval);
if (isSeconds(interval)) {
return secondsFromDate(date, parseInterval(interval));
}
return minutesFromDate(date, parseInterval(interval));
}
/**
* Returns a date that is mins minutes from now.
*
* @param mins The number of mintues from now
*/
export function minutesFromNow(mins: number): Date {
const now = new Date();
return minutesFromDate(new Date(), mins);
}
now.setMinutes(now.getMinutes() + mins);
/**
* Returns a date that is mins minutes from given date.
*
* @param date The date to add minutes to
* @param mins The number of mintues from given date
*/
export function minutesFromDate(date: Date, mins: number): Date {
const result = new Date(date.valueOf());
return now;
result.setMinutes(result.getMinutes() + mins);
return result;
}
/**
* Returns a date that is secs seconds from now.
*
* @param secs The number of seconds from now
*/
export function secondsFromNow(secs: number): Date {
return secondsFromDate(new Date(), secs);
}
/**
* Returns a date that is secs seconds from given date.
*
* @param date The date to add seconds to
* @param secs The number of seconds from given date
*/
export function secondsFromDate(date: Date, secs: number): Date {
const result = new Date(date.valueOf());
result.setSeconds(result.getSeconds() + secs);
return result;
}
/**
* Verifies that the specified interval matches our expected format.
*
* @param {string} interval - An interval such as `5m`
* @param {string} interval - An interval such as `5m` or `10s`
*/
export function assertValidInterval(interval: string) {
if (/^[0-9]+m$/.test(interval)) {
if (isMinutes(interval)) {
return interval;
}
if (isSeconds(interval)) {
return interval;
}
@ -51,3 +113,11 @@ export function assertValidInterval(interval: string) {
function parseInterval(interval: string) {
return parseInt(interval, 10);
}
function isMinutes(interval: string) {
return /^[1-9][0-9]*m$/.test(interval);
}
function isSeconds(interval: string) {
return /^[1-9][0-9]*s$/.test(interval);
}

View file

@ -26,6 +26,8 @@ const getMockConcreteTaskInstance = () => {
status: TaskStatus;
runAt: Date;
scheduledAt: Date;
startedAt: Date | null;
retryAt: Date | null;
state: any;
taskType: string;
params: any;
@ -37,6 +39,8 @@ const getMockConcreteTaskInstance = () => {
status: 'idle',
runAt: new Date(moment('2018-09-18T05:33:09.588Z').valueOf()),
scheduledAt: new Date(moment('2018-09-18T05:33:09.588Z').valueOf()),
startedAt: null,
retryAt: null,
state: {},
taskType: 'nice_task',
params: { abc: 'def' },
@ -153,9 +157,11 @@ Object {
"abc": "def",
},
"primaryTerm": 1,
"retryAt": null,
"runAt": 2018-09-18T05:33:09.588Z,
"scheduledAt": 2018-09-18T05:33:09.588Z,
"sequenceNumber": 1,
"startedAt": null,
"state": Object {},
"status": "idle",
"taskType": "nice_task",

View file

@ -0,0 +1,42 @@
{
"task": {
"properties": {
"taskType": {
"type": "keyword"
},
"scheduledAt": {
"type": "date"
},
"runAt": {
"type": "date"
},
"startedAt": {
"type": "date"
},
"retryAt": {
"type": "date"
},
"interval": {
"type": "text"
},
"attempts": {
"type": "integer"
},
"status": {
"type": "keyword"
},
"params": {
"type": "text"
},
"state": {
"type": "text"
},
"user": {
"type": "keyword"
},
"scope": {
"type": "keyword"
}
}
}
}

View file

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { SavedObject } from 'src/core/server';
export const migrations = {
task: {
'7.4.0': (doc: SavedObject) => ({
...doc,
updated_at: new Date().toISOString(),
}),
},
};

View file

@ -94,13 +94,25 @@ export interface TaskDefinition {
description?: string;
/**
* How long, in minutes, the system should wait for the task to complete
* How long, in minutes or seconds, the system should wait for the task to complete
* before it is considered to be timed out. (e.g. '5m', the default). If
* the task takes longer than this, Kibana will send it a kill command and
* the task will be re-attempted.
*/
timeout?: string;
/**
* Up to how many times the task should retry when it fails to run. This will
* default to the global variable.
*/
maxAttempts?: number;
/**
* Function that returns the delay in seconds to wait before attempting the
* failed task again.
*/
getRetryDelay?: (attempts: number, error: object) => number;
/**
* The numer of workers / slots a running instance of this task occupies.
* This defaults to 1.
@ -126,10 +138,14 @@ export const validateTaskDefinition = Joi.object({
title: Joi.string().optional(),
description: Joi.string().optional(),
timeout: Joi.string().default('5m'),
maxAttempts: Joi.number()
.min(1)
.optional(),
numWorkers: Joi.number()
.min(1)
.default(1),
createTaskRunner: Joi.func().required(),
getRetryDelay: Joi.func().optional(),
}).default();
/**
@ -164,6 +180,19 @@ export interface TaskInstance {
*/
scheduledAt?: Date;
/**
* The date and time that this task started execution. This is used to determine
* the "real" runAt that ended up running the task. This value is only set
* when status is set to "running".
*/
startedAt?: Date | null;
/**
* The date and time that this task should re-execute if stuck in "running" / timeout
* status. This value is only set when status is set to "running".
*/
retryAt?: Date | null;
/**
* The date and time that this task is scheduled to be run. It is not
* guaranteed to run at this time, but it is guaranteed not to run earlier
@ -212,14 +241,9 @@ export interface ConcreteTaskInstance extends TaskInstance {
id: string;
/**
* The sequence number from the Elaticsearch document.
* The saved object version from the Elaticsearch document.
*/
sequenceNumber: number;
/**
* The primary term from the Elaticsearch document.
*/
primaryTerm: number;
version?: string;
/**
* The date and time that this task was originally scheduled. This is used
@ -244,6 +268,19 @@ export interface ConcreteTaskInstance extends TaskInstance {
*/
runAt: Date;
/**
* The date and time that this task started execution. This is used to determine
* the "real" runAt that ended up running the task. This value is only set
* when status is set to "running".
*/
startedAt: Date | null;
/**
* The date and time that this task should re-execute if stuck in "running" / timeout
* status. This value is only set when status is set to "running".
*/
retryAt: Date | null;
/**
* The state passed into the task's run function, and returned by the previous
* run. If there was no previous run, or if the previous run did not return

View file

@ -7,6 +7,11 @@
import _ from 'lodash';
import sinon from 'sinon';
import { TaskManager } from './task_manager';
import { SavedObjectsClientMock } from 'src/core/server/mocks';
import { SavedObjectsSerializer, SavedObjectsSchema } from 'src/core/server';
const savedObjectsClient = SavedObjectsClientMock.create();
const serializer = new SavedObjectsSerializer(new SavedObjectsSchema());
describe('TaskManager', () => {
let clock: sinon.SinonFakeTimers;
@ -28,7 +33,12 @@ describe('TaskManager', () => {
test('disallows schedule before init', async () => {
const { opts } = testOpts();
const client = new TaskManager(opts.kbnServer, opts.server, opts.config);
const client = new TaskManager({
kbnServer: opts.kbnServer,
config: opts.config,
savedObjectsRepository: savedObjectsClient,
serializer,
});
const task = {
taskType: 'foo',
params: {},
@ -39,19 +49,34 @@ describe('TaskManager', () => {
test('disallows fetch before init', async () => {
const { opts } = testOpts();
const client = new TaskManager(opts.kbnServer, opts.server, opts.config);
const client = new TaskManager({
kbnServer: opts.kbnServer,
config: opts.config,
savedObjectsRepository: savedObjectsClient,
serializer,
});
await expect(client.fetch({})).rejects.toThrow(/^NotInitialized: .*/i);
});
test('disallows remove before init', async () => {
const { opts } = testOpts();
const client = new TaskManager(opts.kbnServer, opts.server, opts.config);
const client = new TaskManager({
kbnServer: opts.kbnServer,
config: opts.config,
savedObjectsRepository: savedObjectsClient,
serializer,
});
await expect(client.remove('23')).rejects.toThrow(/^NotInitialized: .*/i);
});
test('allows middleware registration before init', () => {
const { opts } = testOpts();
const client = new TaskManager(opts.kbnServer, opts.server, opts.config);
const client = new TaskManager({
kbnServer: opts.kbnServer,
config: opts.config,
savedObjectsRepository: savedObjectsClient,
serializer,
});
const middleware = {
beforeSave: async (saveOpts: any) => saveOpts,
beforeRun: async (runOpts: any) => runOpts,
@ -61,7 +86,12 @@ describe('TaskManager', () => {
test('disallows middleware registration after init', async () => {
const { $test, opts } = testOpts();
const client = new TaskManager(opts.kbnServer, opts.server, opts.config);
const client = new TaskManager({
kbnServer: opts.kbnServer,
config: opts.config,
savedObjectsRepository: savedObjectsClient,
serializer,
});
const middleware = {
beforeSave: async (saveOpts: any) => saveOpts,
beforeRun: async (runOpts: any) => runOpts,
@ -94,20 +124,23 @@ describe('TaskManager', () => {
afterPluginsInit(callback: any) {
$test.afterPluginsInit = callback;
},
},
server: {
log: sinon.spy(),
decorate(...args: any[]) {
_.set(opts, args.slice(0, -1), _.last(args));
},
plugins: {
elasticsearch: {
getCluster() {
return { callWithInternalUser: callCluster };
},
status: {
on(eventName: string, callback: () => any) {
$test.events[eventName] = callback;
server: {
log: sinon.spy(),
decorate(...args: any[]) {
_.set(opts, args.slice(0, -1), _.last(args));
},
kibanaMigrator: {
awaitMigration: jest.fn(),
},
plugins: {
elasticsearch: {
getCluster() {
return { callWithInternalUser: callCluster };
},
status: {
on(eventName: string, callback: () => any) {
$test.events[eventName] = callback;
},
},
},
},

View file

@ -4,6 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { SavedObjectsClientContract, SavedObjectsSerializer } from 'src/core/server';
import { fillPool } from './lib/fill_pool';
import { Logger, TaskManagerLogger } from './lib/logger';
import { addMiddlewareToChain, BeforeSaveMiddlewareParams, Middleware } from './lib/middleware';
@ -13,7 +14,14 @@ import { SanitizedTaskDefinition, TaskDefinition, TaskDictionary } from './task'
import { TaskPoller } from './task_poller';
import { TaskPool } from './task_pool';
import { TaskManagerRunner } from './task_runner';
import { FetchOpts, FetchResult, RemoveResult, TaskStore } from './task_store';
import { FetchOpts, FetchResult, TaskStore } from './task_store';
export interface TaskManagerOpts {
kbnServer: any;
config: any;
savedObjectsRepository: SavedObjectsClientContract;
serializer: SavedObjectsSerializer;
}
/*
* The TaskManager is the public interface into the task manager system. This glues together
@ -46,9 +54,10 @@ export class TaskManager {
* enabling the task manipulation methods, and beginning the background polling
* mechanism.
*/
constructor(kbnServer: any, server: any, config: any) {
this.maxWorkers = config.get('xpack.task_manager.max_workers');
this.overrideNumWorkers = config.get('xpack.task_manager.override_num_workers');
constructor(opts: TaskManagerOpts) {
const { server } = opts.kbnServer;
this.maxWorkers = opts.config.get('xpack.task_manager.max_workers');
this.overrideNumWorkers = opts.config.get('xpack.task_manager.override_num_workers');
this.definitions = {};
const logger = new TaskManagerLogger((...args: any[]) => server.log(...args));
@ -56,12 +65,11 @@ export class TaskManager {
/* Kibana UUID needs to be pulled live (not cached), as it takes a long time
* to initialize, and can change after startup */
const store = new TaskStore({
serializer: opts.serializer,
savedObjectsRepository: opts.savedObjectsRepository,
callCluster: server.plugins.elasticsearch.getCluster('admin').callWithInternalUser,
index: config.get('xpack.task_manager.index'),
maxAttempts: config.get('xpack.task_manager.max_attempts'),
supportedTypes: Object.keys(this.definitions),
logger,
getKibanaUuid: () => config.get('server.uuid'),
maxAttempts: opts.config.get('xpack.task_manager.max_attempts'),
definitions: this.definitions,
});
const pool = new TaskPool({
logger,
@ -70,7 +78,7 @@ export class TaskManager {
const createRunner = (instance: ConcreteTaskInstance) =>
new TaskManagerRunner({
logger,
kbnServer,
kbnServer: opts.kbnServer,
instance,
store,
definitions: this.definitions,
@ -78,8 +86,7 @@ export class TaskManager {
});
const poller = new TaskPoller({
logger,
pollInterval: config.get('xpack.task_manager.poll_interval'),
store,
pollInterval: opts.config.get('xpack.task_manager.poll_interval'),
work(): Promise<void> {
return fillPool(pool.run, store.fetchAvailableTasks, createRunner);
},
@ -89,25 +96,25 @@ export class TaskManager {
this.store = store;
this.poller = poller;
kbnServer.afterPluginsInit(async () => {
store.addSupportedTypes(Object.keys(this.definitions));
const startPoller = () => {
return poller
.start()
.then(() => {
this.isInitialized = true;
})
.catch((err: Error) => {
// FIXME: check the type of error to make sure it's actually an ES error
logger.warning(`PollError ${err.message}`);
opts.kbnServer.afterPluginsInit(() => {
// By this point, the plugins had their chance to register task definitions
// and we're good to start doing CRUD actions
this.isInitialized = true;
const startPoller = async () => {
await server.kibanaMigrator.awaitMigration();
try {
await poller.start();
} catch (err) {
// FIXME: check the type of error to make sure it's actually an ES error
logger.warning(`PollError ${err.message}`);
// rety again to initialize store and poller, using the timing of
// task_manager's configurable poll interval
const retryInterval = config.get('xpack.task_manager.poll_interval');
setTimeout(() => startPoller(), retryInterval);
});
// rety again to initialize store and poller, using the timing of
// task_manager's configurable poll interval
const retryInterval = opts.config.get('xpack.task_manager.poll_interval');
setTimeout(() => startPoller(), retryInterval);
}
};
return startPoller();
startPoller();
});
}
@ -180,7 +187,7 @@ export class TaskManager {
* @param {string} id
* @returns {Promise<RemoveResult>}
*/
public async remove(id: string): Promise<RemoveResult> {
public async remove(id: string): Promise<void> {
this.assertInitialized('Tasks cannot be removed before task manager is initialized!');
return this.store.remove(id);
}

View file

@ -7,24 +7,12 @@
import _ from 'lodash';
import sinon from 'sinon';
import { TaskPoller } from './task_poller';
import { TaskStore } from './task_store';
import { mockLogger, resolvable, sleep } from './test_utils';
let store: TaskStore;
describe('TaskPoller', () => {
beforeEach(() => {
const callCluster = sinon.stub();
callCluster.withArgs('indices.getTemplate').returns(Promise.resolve({ tasky: {} }));
const getKibanaUuid = sinon.stub().returns('kibana-123-uuid-test');
store = new TaskStore({
callCluster,
getKibanaUuid,
logger: mockLogger(),
index: 'tasky',
maxAttempts: 2,
supportedTypes: ['a', 'b', 'c'],
});
});
describe('interval tests', () => {
@ -44,7 +32,6 @@ describe('TaskPoller', () => {
return Promise.resolve();
});
const poller = new TaskPoller({
store,
pollInterval,
work,
logger: mockLogger(),
@ -67,7 +54,6 @@ describe('TaskPoller', () => {
const logger = mockLogger();
const doneWorking = resolvable();
const poller = new TaskPoller({
store,
logger,
pollInterval: 1,
work: async () => {
@ -98,7 +84,6 @@ describe('TaskPoller', () => {
});
const poller = new TaskPoller({
store,
logger: mockLogger(),
pollInterval: 1,
work,
@ -117,7 +102,6 @@ describe('TaskPoller', () => {
await doneWorking;
});
const poller = new TaskPoller({
store,
pollInterval: 1,
logger: mockLogger(),
work,
@ -143,7 +127,6 @@ describe('TaskPoller', () => {
doneWorking.resolve();
});
const poller = new TaskPoller({
store,
pollInterval: 1,
logger: mockLogger(),
work,
@ -154,19 +137,4 @@ describe('TaskPoller', () => {
sinon.assert.calledOnce(work);
});
test('start method passes through error from store.init', async () => {
store.init = () => {
throw new Error('test error');
};
const poller = new TaskPoller({
store,
pollInterval: 1,
logger: mockLogger(),
work: sinon.stub(),
});
await expect(poller.start()).rejects.toMatchInlineSnapshot(`[Error: test error]`);
});
});

View file

@ -9,14 +9,12 @@
*/
import { Logger } from './lib/logger';
import { TaskStore } from './task_store';
type WorkFn = () => Promise<void>;
interface Opts {
pollInterval: number;
logger: Logger;
store: TaskStore;
work: WorkFn;
}
@ -30,7 +28,6 @@ export class TaskPoller {
private timeout: any;
private pollInterval: number;
private logger: Logger;
private store: TaskStore;
private work: WorkFn;
/**
@ -44,7 +41,6 @@ export class TaskPoller {
constructor(opts: Opts) {
this.pollInterval = opts.pollInterval;
this.logger = opts.logger;
this.store = opts.store;
this.work = opts.work;
}
@ -56,10 +52,6 @@ export class TaskPoller {
return;
}
if (!this.store.isInitialized) {
await this.store.init();
}
this.isStarted = true;
const poll = async () => {

View file

@ -6,10 +6,18 @@
import _ from 'lodash';
import sinon from 'sinon';
import { minutesFromNow } from './lib/intervals';
import { minutesFromNow, secondsFromNow } from './lib/intervals';
import { ConcreteTaskInstance } from './task';
import { TaskManagerRunner } from './task_runner';
let fakeTimer: sinon.SinonFakeTimers;
beforeAll(() => {
fakeTimer = sinon.useFakeTimers();
});
afterAll(() => fakeTimer.restore());
describe('TaskManagerRunner', () => {
test('provides details about the task that is running', () => {
const { runner } = testOpts({
@ -53,7 +61,7 @@ describe('TaskManagerRunner', () => {
state: { hey: 'there' },
},
definitions: {
testtype: {
bar: {
createTaskRunner: () => ({
async run() {
throw new Error('Dangit!');
@ -69,8 +77,7 @@ describe('TaskManagerRunner', () => {
const instance = store.update.args[0][0];
expect(instance.id).toEqual(id);
expect(instance.attempts).toEqual(initialAttempts + 1);
expect(instance.runAt.getTime()).toBeGreaterThan(Date.now());
expect(instance.runAt.getTime()).toEqual(minutesFromNow(initialAttempts * 5).getTime());
expect(instance.params).toEqual({ a: 'b' });
expect(instance.state).toEqual({ hey: 'there' });
});
@ -79,6 +86,17 @@ describe('TaskManagerRunner', () => {
const { runner, store } = testOpts({
instance: {
interval: '10m',
status: 'running',
startedAt: new Date(),
},
definitions: {
bar: {
createTaskRunner: () => ({
async run() {
return;
},
}),
},
},
});
@ -165,7 +183,9 @@ describe('TaskManagerRunner', () => {
bar: {
createTaskRunner: () => ({
async run() {
await new Promise(r => setTimeout(r, 1000));
const promise = new Promise(r => setTimeout(r, 1000));
fakeTimer.tick(1000);
await promise;
},
async cancel() {
wasCancelled = true;
@ -176,7 +196,7 @@ describe('TaskManagerRunner', () => {
});
const promise = runner.run();
await new Promise(r => setInterval(r, 1));
await Promise.resolve();
await runner.cancel();
await promise;
@ -187,7 +207,7 @@ describe('TaskManagerRunner', () => {
test('warns if cancel is called on a non-cancellable task', async () => {
const { runner, logger } = testOpts({
definitions: {
testType: {
bar: {
createTaskRunner: () => ({
run: async () => undefined,
}),
@ -202,6 +222,166 @@ describe('TaskManagerRunner', () => {
sinon.assert.calledWithMatch(logger.warning, /not cancellable/);
});
test('sets startedAt, status, attempts and retryAt when claiming a task', async () => {
const timeoutMinutes = 1;
const id = _.random(1, 20).toString();
const initialAttempts = _.random(0, 2);
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
interval: undefined,
},
definitions: {
bar: {
timeout: `${timeoutMinutes}m`,
createTaskRunner: () => ({
run: async () => undefined,
}),
},
},
});
await runner.claimOwnership();
sinon.assert.calledOnce(store.update);
const instance = store.update.args[0][0];
expect(instance.attempts).toEqual(initialAttempts + 1);
expect(instance.status).toBe('running');
expect(instance.startedAt.getTime()).toEqual(Date.now());
expect(instance.retryAt.getTime()).toEqual(
minutesFromNow((initialAttempts + 1) * 5).getTime() + timeoutMinutes * 60 * 1000
);
});
test('uses getRetryDelay function on error when defined', async () => {
const initialAttempts = _.random(0, 2);
const retryDelay = _.random(15, 100);
const id = Date.now().toString();
const getRetryDelayStub = sinon.stub().returns(retryDelay);
const error = new Error('Dangit!');
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
},
definitions: {
bar: {
getRetryDelay: getRetryDelayStub,
createTaskRunner: () => ({
async run() {
throw error;
},
}),
},
},
});
await runner.run();
sinon.assert.calledOnce(store.update);
sinon.assert.calledWith(getRetryDelayStub, initialAttempts, error);
const instance = store.update.args[0][0];
expect(instance.runAt.getTime()).toEqual(secondsFromNow(retryDelay).getTime());
});
test('uses getRetryDelay to set retryAt when defined', async () => {
const id = _.random(1, 20).toString();
const initialAttempts = _.random(0, 2);
const retryDelay = _.random(15, 100);
const timeoutMinutes = 1;
const getRetryDelayStub = sinon.stub().returns(retryDelay);
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
interval: undefined,
},
definitions: {
bar: {
timeout: `${timeoutMinutes}m`,
getRetryDelay: getRetryDelayStub,
createTaskRunner: () => ({
run: async () => undefined,
}),
},
},
});
await runner.claimOwnership();
sinon.assert.calledOnce(store.update);
sinon.assert.calledWith(getRetryDelayStub, initialAttempts + 1);
const instance = store.update.args[0][0];
expect(instance.retryAt.getTime()).toEqual(
secondsFromNow(retryDelay).getTime() + timeoutMinutes * 60 * 1000
);
});
test('Fails non-recurring task when maxAttempts reached', async () => {
const id = _.random(1, 20).toString();
const initialAttempts = 3;
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
interval: undefined,
},
definitions: {
bar: {
maxAttempts: 3,
createTaskRunner: () => ({
run: async () => {
throw new Error();
},
}),
},
},
});
await runner.run();
sinon.assert.calledOnce(store.update);
const instance = store.update.args[0][0];
expect(instance.attempts).toEqual(3);
expect(instance.status).toEqual('failed');
expect(instance.retryAt).toBeNull();
expect(instance.runAt.getTime()).toBeLessThanOrEqual(Date.now());
});
test(`Doesn't fail recurring tasks when maxAttempts reached`, async () => {
const id = _.random(1, 20).toString();
const initialAttempts = 3;
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
interval: '10s',
},
definitions: {
bar: {
maxAttempts: 3,
createTaskRunner: () => ({
run: async () => {
throw new Error();
},
}),
},
},
});
await runner.run();
sinon.assert.calledOnce(store.update);
const instance = store.update.args[0][0];
expect(instance.attempts).toEqual(3);
expect(instance.status).toEqual('idle');
expect(instance.runAt.getTime()).toEqual(minutesFromNow(15).getTime());
});
interface TestOpts {
instance?: Partial<ConcreteTaskInstance>;
definitions?: any;
@ -234,6 +414,8 @@ describe('TaskManagerRunner', () => {
primaryTerm: 32,
runAt: new Date(),
scheduledAt: new Date(),
startedAt: null,
retryAt: null,
attempts: 0,
params: {},
scope: ['reporting'],

View file

@ -11,7 +11,8 @@
*/
import Joi from 'joi';
import { intervalFromNow, minutesFromNow } from './lib/intervals';
import Boom from 'boom';
import { intervalFromDate, intervalFromNow } from './lib/intervals';
import { Logger } from './lib/logger';
import { BeforeRunFunction } from './lib/middleware';
import {
@ -23,7 +24,8 @@ import {
TaskDictionary,
validateRunResult,
} from './task';
import { RemoveResult } from './task_store';
const defaultBackoffPerFailure = 5 * 60 * 1000;
export interface TaskRunner {
numWorkers: number;
@ -37,7 +39,7 @@ export interface TaskRunner {
interface Updatable {
readonly maxAttempts: number;
update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance>;
remove(id: string): Promise<RemoveResult>;
remove(id: string): Promise<void>;
}
interface Opts {
@ -119,7 +121,7 @@ export class TaskManagerRunner implements TaskRunner {
* Gets whether or not this task has run longer than its expiration setting allows.
*/
public get isExpired() {
return this.instance.runAt < new Date();
return intervalFromDate(this.instance.startedAt!, this.definition.timeout)! < new Date();
}
/**
@ -166,12 +168,20 @@ export class TaskManagerRunner implements TaskRunner {
*/
public async claimOwnership(): Promise<boolean> {
const VERSION_CONFLICT_STATUS = 409;
const attempts = this.instance.attempts + 1;
const now = new Date();
const timeoutDate = intervalFromNow(this.definition.timeout!)!;
try {
this.instance = await this.store.update({
...this.instance,
status: 'running',
runAt: intervalFromNow(this.definition.timeout)!,
startedAt: now,
attempts,
retryAt: new Date(
timeoutDate.getTime() + this.getRetryDelay(attempts, Boom.clientTimeout())
),
});
return true;
@ -211,19 +221,21 @@ export class TaskManagerRunner implements TaskRunner {
private async processResultForRecurringTask(result: RunResult): Promise<RunResult> {
// recurring task: update the task instance
const startedAt = this.instance.startedAt!;
const state = result.state || this.instance.state || {};
const status = this.instance.attempts < this.store.maxAttempts ? 'idle' : 'failed';
const status = this.getInstanceStatus();
let runAt;
if (status === 'failed') {
// task run errored, keep the same runAt
runAt = this.instance.runAt;
} else if (result.runAt) {
runAt = result.runAt;
} else if (result.error) {
// when result.error is truthy, then we're retrying because it failed
runAt = new Date(Date.now() + this.getRetryDelay(this.instance.attempts, result.error));
} else {
runAt =
result.runAt ||
intervalFromNow(this.instance.interval) ||
// when result.error is truthy, then we're retrying because it failed
minutesFromNow((this.instance.attempts + 1) * 5); // incrementally backs off an extra 5m per failure
runAt = intervalFromDate(startedAt, this.instance.interval)!;
}
await this.store.update({
@ -231,7 +243,9 @@ export class TaskManagerRunner implements TaskRunner {
runAt,
state,
status,
attempts: result.error ? this.instance.attempts + 1 : 0,
startedAt: null,
retryAt: null,
attempts: result.error ? this.instance.attempts : 0,
});
return result;
@ -262,6 +276,22 @@ export class TaskManagerRunner implements TaskRunner {
}
return result;
}
private getInstanceStatus() {
if (this.instance.interval) {
return 'idle';
}
const maxAttempts = this.definition.maxAttempts || this.store.maxAttempts;
return this.instance.attempts < maxAttempts ? 'idle' : 'failed';
}
private getRetryDelay(attempts: number, error: any) {
if (this.definition.getRetryDelay) {
return this.definition.getRetryDelay(attempts, error) * 1000;
}
return attempts * defaultBackoffPerFailure;
}
}
function sanitizeInstance(instance: ConcreteTaskInstance): ConcreteTaskInstance {

View file

@ -6,104 +6,73 @@
import _ from 'lodash';
import sinon from 'sinon';
import {
TASK_MANAGER_API_VERSION as API_VERSION,
TASK_MANAGER_TEMPLATE_VERSION as TEMPLATE_VERSION,
} from './constants';
import { TaskInstance, TaskStatus } from './task';
import { TaskDictionary, SanitizedTaskDefinition, TaskInstance, TaskStatus } from './task';
import { FetchOpts, TaskStore } from './task_store';
import { mockLogger } from './test_utils';
import { SavedObjectsClientMock } from 'src/core/server/mocks';
import { SavedObjectsSerializer, SavedObjectsSchema, SavedObjectAttributes } from 'src/core/server';
const getKibanaUuid = sinon.stub().returns('kibana-uuid-123-test');
const taskDefinitions: TaskDictionary<SanitizedTaskDefinition> = {
report: {
type: 'report',
title: '',
numWorkers: 1,
createTaskRunner: jest.fn(),
},
dernstraight: {
type: 'dernstraight',
title: '',
numWorkers: 1,
createTaskRunner: jest.fn(),
},
yawn: {
type: 'yawn',
title: '',
numWorkers: 1,
createTaskRunner: jest.fn(),
},
};
const savedObjectsClient = SavedObjectsClientMock.create();
const serializer = new SavedObjectsSerializer(new SavedObjectsSchema());
beforeEach(() => jest.resetAllMocks());
const mockedDate = new Date('2019-02-12T21:01:22.479Z');
(global as any).Date = class Date {
constructor() {
return mockedDate;
}
static now() {
return mockedDate.getTime();
}
};
describe('TaskStore', () => {
describe('init', () => {
test('creates the task manager index', async () => {
const callCluster = sinon.stub();
callCluster.withArgs('indices.getTemplate').returns(Promise.resolve({ tasky: {} }));
const store = new TaskStore({
callCluster,
getKibanaUuid,
logger: mockLogger(),
index: 'tasky',
maxAttempts: 2,
supportedTypes: ['a', 'b', 'c'],
});
await store.init();
sinon.assert.calledTwice(callCluster); // store.init calls twice: once to check for existing template, once to put the template (if needed)
sinon.assert.calledWithMatch(callCluster, 'indices.putTemplate', {
body: {
index_patterns: ['tasky'],
settings: {
number_of_shards: 1,
auto_expand_replicas: '0-1',
},
},
name: 'tasky',
});
});
test('logs a warning if newer index template exists', async () => {
const callCluster = sinon.stub();
callCluster
.withArgs('indices.getTemplate')
.returns(Promise.resolve({ tasky: { version: Infinity } }));
const logger = {
info: sinon.spy(),
debug: sinon.spy(),
warning: sinon.spy(),
error: sinon.spy(),
};
const store = new TaskStore({
callCluster,
getKibanaUuid,
logger,
index: 'tasky',
maxAttempts: 2,
supportedTypes: ['a', 'b', 'c'],
});
await store.init();
const loggingCall = logger.warning.getCall(0);
expect(loggingCall.args[0]).toBe(
`This Kibana instance defines an older template version (${TEMPLATE_VERSION}) than is currently in Elasticsearch (Infinity). ` +
`Because of the potential for non-backwards compatible changes, this Kibana instance will only be able to claim scheduled tasks with ` +
`"kibana.apiVersion" <= ${API_VERSION} in the task metadata.`
);
expect(logger.warning.calledOnce).toBe(true);
});
});
describe('schedule', () => {
async function testSchedule(task: TaskInstance) {
const callCluster = sinon.stub();
callCluster.withArgs('index').returns(
Promise.resolve({
_id: 'testid',
_seq_no: 3344,
_primary_term: 3344,
const callCluster = jest.fn();
savedObjectsClient.create.mockImplementation(
async (type: string, attributes: SavedObjectAttributes) => ({
id: 'testid',
type,
attributes,
references: [],
version: '123',
})
);
callCluster.withArgs('indices.getTemplate').returns(Promise.resolve({ tasky: {} }));
const store = new TaskStore({
serializer,
callCluster,
getKibanaUuid,
logger: mockLogger(),
index: 'tasky',
maxAttempts: 2,
supportedTypes: ['report', 'dernstraight', 'yawn'],
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
await store.init();
const result = await store.schedule(task);
sinon.assert.calledThrice(callCluster);
expect(savedObjectsClient.create).toHaveBeenCalledTimes(1);
return { result, callCluster, arg: callCluster.args[2][1] };
return result;
}
test('serializes the params and state', async () => {
@ -112,18 +81,42 @@ describe('TaskStore', () => {
state: { foo: 'bar' },
taskType: 'report',
};
const { callCluster, arg } = await testSchedule(task);
const result = await testSchedule(task);
sinon.assert.calledWith(callCluster, 'index');
expect(arg).toMatchObject({
index: 'tasky',
body: {
task: {
params: JSON.stringify(task.params),
state: JSON.stringify(task.state),
},
expect(savedObjectsClient.create).toHaveBeenCalledWith(
'task',
{
attempts: 0,
interval: undefined,
params: '{"hello":"world"}',
retryAt: null,
runAt: '2019-02-12T21:01:22.479Z',
scheduledAt: '2019-02-12T21:01:22.479Z',
scope: undefined,
startedAt: null,
state: '{"foo":"bar"}',
status: 'idle',
taskType: 'report',
user: undefined,
},
{}
);
expect(result).toEqual({
id: 'testid',
attempts: 0,
interval: undefined,
params: { hello: 'world' },
retryAt: null,
runAt: mockedDate,
scheduledAt: mockedDate,
scope: undefined,
startedAt: null,
state: { foo: 'bar' },
status: 'idle',
taskType: 'report',
user: undefined,
version: '123',
});
});
@ -133,26 +126,27 @@ describe('TaskStore', () => {
state: { foo: 'bar' },
taskType: 'report',
};
const { result } = await testSchedule(task);
const result = await testSchedule(task);
expect(result).toMatchObject({
...task,
sequenceNumber: 3344,
primaryTerm: 3344,
id: 'testid',
});
});
test('sets runAt to now if not specified', async () => {
const now = Date.now();
const { arg } = await testSchedule({ taskType: 'dernstraight', params: {}, state: {} });
expect(arg.body.task.runAt.getTime()).toBeGreaterThanOrEqual(now);
await testSchedule({ taskType: 'dernstraight', params: {}, state: {} });
expect(savedObjectsClient.create).toHaveBeenCalledTimes(1);
const attributes = savedObjectsClient.create.mock.calls[0][1];
expect(new Date(attributes.runAt as string).getTime()).toEqual(mockedDate.getTime());
});
test('ensures params and state are not null', async () => {
const { arg } = await testSchedule({ taskType: 'yawn' } as any);
expect(arg.body.task.params).toEqual('{}');
expect(arg.body.task.state).toEqual('{}');
await testSchedule({ taskType: 'yawn' } as any);
expect(savedObjectsClient.create).toHaveBeenCalledTimes(1);
const attributes = savedObjectsClient.create.mock.calls[0][1];
expect(attributes.params).toEqual('{}');
expect(attributes.state).toEqual('{}');
});
test('errors if the task type is unknown', async () => {
@ -166,12 +160,11 @@ describe('TaskStore', () => {
async function testFetch(opts?: FetchOpts, hits: any[] = []) {
const callCluster = sinon.spy(async () => ({ hits: { hits } }));
const store = new TaskStore({
serializer,
callCluster,
getKibanaUuid,
logger: mockLogger(),
index: 'tasky',
maxAttempts: 2,
supportedTypes: ['a', 'b', 'c'],
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
const result = await store.fetch(opts);
@ -188,7 +181,7 @@ describe('TaskStore', () => {
test('empty call filters by type, sorts by runAt and id', async () => {
const { args } = await testFetch();
expect(args).toMatchObject({
index: 'tasky',
index: '.kibana_task_manager',
body: {
sort: [{ 'task.runAt': 'asc' }, { _id: 'desc' }],
query: { term: { type: 'task' } },
@ -301,13 +294,14 @@ describe('TaskStore', () => {
interval: undefined,
params: { hello: 'world' },
runAt,
scheduledAt: mockedDate,
scope: ['reporting'],
state: { baby: 'Henhen' },
status: 'idle',
taskType: 'foo',
user: 'jimbo',
sequenceNumber: undefined,
primaryTerm: undefined,
retryAt: undefined,
startedAt: undefined,
},
{
attempts: 2,
@ -315,13 +309,14 @@ describe('TaskStore', () => {
interval: '5m',
params: { shazm: 1 },
runAt,
scheduledAt: mockedDate,
scope: ['reporting', 'ceo'],
state: { henry: 'The 8th' },
status: 'running',
taskType: 'bar',
user: 'dabo',
sequenceNumber: undefined,
primaryTerm: undefined,
retryAt: undefined,
startedAt: undefined,
},
],
searchAfter: ['b', 2],
@ -335,9 +330,9 @@ describe('TaskStore', () => {
const store = new TaskStore({
callCluster,
logger: mockLogger(),
supportedTypes: ['a', 'b', 'c'],
index: 'tasky',
definitions: taskDefinitions,
maxAttempts: 2,
serializer,
...opts,
});
@ -355,12 +350,11 @@ describe('TaskStore', () => {
test('it returns normally with no tasks when the index does not exist.', async () => {
const callCluster = sinon.spy(async () => ({ hits: { hits: [] } }));
const store = new TaskStore({
serializer,
callCluster,
getKibanaUuid,
logger: mockLogger(),
supportedTypes: ['a', 'b', 'c'],
index: 'tasky',
definitions: taskDefinitions,
maxAttempts: 2,
savedObjectsRepository: savedObjectsClient,
});
const result = await store.fetchAvailableTasks();
@ -373,12 +367,25 @@ describe('TaskStore', () => {
test('it filters tasks by supported types, maxAttempts, and runAt', async () => {
const maxAttempts = _.random(2, 43);
const index = `index_${_.random(1, 234)}`;
const customMaxAttempts = _.random(44, 100);
const { args } = await testFetchAvailableTasks({
opts: {
index,
maxAttempts,
supportedTypes: ['foo', 'bar'],
definitions: {
foo: {
type: 'foo',
title: '',
numWorkers: 1,
createTaskRunner: jest.fn(),
},
bar: {
type: 'bar',
title: '',
numWorkers: 1,
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
},
},
});
expect(args).toMatchObject({
@ -390,10 +397,63 @@ describe('TaskStore', () => {
{
bool: {
must: [
{ terms: { 'task.taskType': ['foo', 'bar'] } },
{ range: { 'task.attempts': { lte: maxAttempts } } },
{ range: { 'task.runAt': { lte: 'now' } } },
{ range: { 'kibana.apiVersion': { lte: 1 } } },
{
bool: {
should: [
{
bool: {
must: [
{ term: { 'task.status': 'idle' } },
{ range: { 'task.runAt': { lte: 'now' } } },
],
},
},
{
bool: {
must: [
{ term: { 'task.status': 'running' } },
{ range: { 'task.retryAt': { lte: 'now' } } },
],
},
},
],
},
},
{
bool: {
should: [
{ exists: { field: 'task.interval' } },
{
bool: {
must: [
{ term: { 'task.taskType': 'foo' } },
{
range: {
'task.attempts': {
lt: maxAttempts,
},
},
},
],
},
},
{
bool: {
must: [
{ term: { 'task.taskType': 'bar' } },
{
range: {
'task.attempts': {
lt: customMaxAttempts,
},
},
},
],
},
},
],
},
},
],
},
},
@ -401,10 +461,18 @@ describe('TaskStore', () => {
},
},
size: 10,
sort: { 'task.runAt': { order: 'asc' } },
sort: {
_script: {
type: 'number',
order: 'asc',
script: {
lang: 'expression',
source: `doc['task.retryAt'].value || doc['task.runAt'].value`,
},
},
},
seq_no_primary_term: true,
},
index,
});
});
@ -428,6 +496,8 @@ describe('TaskStore', () => {
scope: ['reporting'],
},
},
_seq_no: 1,
_primary_term: 2,
sort: ['a', 1],
},
{
@ -446,6 +516,8 @@ describe('TaskStore', () => {
scope: ['reporting', 'ceo'],
},
},
_seq_no: 3,
_primary_term: 4,
sort: ['b', 2],
},
],
@ -462,8 +534,6 @@ describe('TaskStore', () => {
status: 'idle',
taskType: 'foo',
user: 'jimbo',
sequenceNumber: undefined,
primaryTerm: undefined,
},
{
attempts: 2,
@ -476,8 +546,6 @@ describe('TaskStore', () => {
status: 'running',
taskType: 'bar',
user: 'dabo',
sequenceNumber: undefined,
primaryTerm: undefined,
},
]);
});
@ -485,60 +553,70 @@ describe('TaskStore', () => {
describe('update', () => {
test('refreshes the index, handles versioning', async () => {
const runAt = new Date();
const task = {
runAt,
scheduledAt: runAt,
runAt: mockedDate,
scheduledAt: mockedDate,
startedAt: null,
retryAt: null,
id: 'task:324242',
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
sequenceNumber: 2,
primaryTerm: 2,
attempts: 3,
status: 'idle' as TaskStatus,
version: '123',
};
const callCluster = sinon.spy(async () => ({
_seq_no: task.sequenceNumber + 1,
_primary_term: task.primaryTerm + 1,
}));
savedObjectsClient.update.mockImplementation(
async (type: string, id: string, attributes: SavedObjectAttributes) => {
return {
id,
type,
attributes,
references: [],
version: '123',
};
}
);
const store = new TaskStore({
callCluster,
getKibanaUuid,
logger: mockLogger(),
index: 'tasky',
serializer,
callCluster: jest.fn(),
maxAttempts: 2,
supportedTypes: ['a', 'b', 'c'],
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
const result = await store.update(task);
sinon.assert.calledOnce(callCluster);
sinon.assert.calledWith(callCluster, 'update');
expect(callCluster.args[0][1]).toMatchObject({
id: task.id,
index: 'tasky',
if_seq_no: 2,
if_primary_term: 2,
refresh: true,
body: {
doc: {
task: {
..._.omit(task, ['id', 'sequenceNumber', 'primaryTerm']),
params: JSON.stringify(task.params),
state: JSON.stringify(task.state),
},
},
expect(savedObjectsClient.update).toHaveBeenCalledWith(
'task',
task.id,
{
attempts: task.attempts,
interval: undefined,
params: JSON.stringify(task.params),
retryAt: null,
runAt: task.runAt.toISOString(),
scheduledAt: mockedDate.toISOString(),
scope: undefined,
startedAt: null,
state: JSON.stringify(task.state),
status: task.status,
taskType: task.taskType,
user: undefined,
},
});
{ version: '123' }
);
expect(result).toEqual({
...task,
sequenceNumber: 3,
primaryTerm: 3,
interval: undefined,
retryAt: null,
scope: undefined,
startedAt: null,
user: undefined,
version: '123',
});
});
});
@ -546,41 +624,17 @@ describe('TaskStore', () => {
describe('remove', () => {
test('removes the task with the specified id', async () => {
const id = `id-${_.random(1, 20)}`;
const callCluster = sinon.spy(() =>
Promise.resolve({
_index: 'myindex',
_id: id,
_seq_no: 32,
_primary_term: 32,
result: 'deleted',
})
);
const callCluster = jest.fn();
const store = new TaskStore({
serializer,
callCluster,
getKibanaUuid,
logger: mockLogger(),
index: 'myindex',
maxAttempts: 2,
supportedTypes: ['a'],
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
const result = await store.remove(id);
sinon.assert.calledOnce(callCluster);
sinon.assert.calledWith(callCluster, 'delete');
expect(result).toEqual({
id,
index: 'myindex',
sequenceNumber: 32,
primaryTerm: 32,
result: 'deleted',
});
expect(callCluster.args[0][1]).toMatchObject({
id,
index: 'myindex',
refresh: true,
});
expect(result).toBeUndefined();
expect(savedObjectsClient.delete).toHaveBeenCalledWith('task', id);
});
});
});

View file

@ -8,20 +8,29 @@
* This module contains helpers for managing the task manager storage layer.
*/
import { omit } from 'lodash';
import {
TASK_MANAGER_API_VERSION as API_VERSION,
TASK_MANAGER_TEMPLATE_VERSION as TEMPLATE_VERSION,
} from './constants';
import { Logger } from './lib/logger';
import { ConcreteTaskInstance, ElasticJs, TaskInstance, TaskStatus } from './task';
SavedObjectsClientContract,
SavedObject,
SavedObjectAttributes,
SavedObjectsSerializer,
SavedObjectsRawDoc,
} from 'src/core/server';
import {
ConcreteTaskInstance,
ElasticJs,
SanitizedTaskDefinition,
TaskDictionary,
TaskInstance,
} from './task';
import { TASK_MANAGER_INDEX } from './constants';
export interface StoreOpts {
callCluster: ElasticJs;
getKibanaUuid: () => string;
index: string;
maxAttempts: number;
supportedTypes: string[];
logger: Logger;
definitions: TaskDictionary<SanitizedTaskDefinition>;
savedObjectsRepository: SavedObjectsClientContract;
serializer: SavedObjectsSerializer;
}
export interface FetchOpts {
@ -35,224 +44,57 @@ export interface FetchResult {
docs: ConcreteTaskInstance[];
}
export interface RemoveResult {
index: string;
id: string;
sequenceNumber: number;
primaryTerm: number;
result: string;
}
// Internal, the raw document, as stored in the Kibana index.
export interface RawTaskDoc {
_id: string;
_index: string;
_seq_no: number;
_primary_term: number;
_source: {
type: string;
kibana: {
uuid: string;
version: number;
apiVersion: number;
};
task: {
taskType: string;
scheduledAt: Date;
runAt: Date;
interval?: string;
attempts: number;
status: TaskStatus;
params: string;
state: string;
user?: string;
scope?: string[];
};
};
}
/**
* Wraps an elasticsearch connection and provides a task manager-specific
* interface into the index.
*/
export class TaskStore {
public readonly maxAttempts: number;
public getKibanaUuid: () => string;
public readonly index: string;
private callCluster: ElasticJs;
private supportedTypes: string[];
private _isInitialized = false; // eslint-disable-line @typescript-eslint/camelcase
private logger: Logger;
private definitions: TaskDictionary<SanitizedTaskDefinition>;
private savedObjectsRepository: SavedObjectsClientContract;
private serializer: SavedObjectsSerializer;
/**
* Constructs a new TaskStore.
* @param {StoreOpts} opts
* @prop {CallCluster} callCluster - The elastic search connection
* @prop {string} index - The name of the task manager index
* @prop {number} maxAttempts - The maximum number of attempts before a task will be abandoned
* @prop {string[]} supportedTypes - The task types supported by this store
* @prop {Logger} logger - The task manager logger.
* @prop {TaskDefinition} definition - The definition of the task being run
* @prop {serializer} - The saved object serializer
* @prop {savedObjectsRepository} - An instance to the saved objects repository
*/
constructor(opts: StoreOpts) {
this.callCluster = opts.callCluster;
this.index = opts.index;
this.maxAttempts = opts.maxAttempts;
this.supportedTypes = opts.supportedTypes;
this.logger = opts.logger;
this.getKibanaUuid = opts.getKibanaUuid;
this.definitions = opts.definitions;
this.serializer = opts.serializer;
this.savedObjectsRepository = opts.savedObjectsRepository;
this.fetchAvailableTasks = this.fetchAvailableTasks.bind(this);
}
public addSupportedTypes(types: string[]) {
if (!this._isInitialized) {
this.supportedTypes = this.supportedTypes.concat(types);
} else {
throw new Error('Cannot add task types after initialization');
}
}
/**
* Initializes the store, ensuring the task manager index template is created
* and the version is up to date.
*/
public async init() {
if (this._isInitialized) {
throw new Error('TaskStore has already been initialized!');
}
let existingVersion = -Infinity;
const templateName = this.index;
try {
// check if template exists
const templateCheck = await this.callCluster('indices.getTemplate', {
name: templateName,
filter_path: '*.version',
});
// extract the existing version
const template = templateCheck[templateName] || {};
existingVersion = template.version || 0;
} catch (err) {
if (err.statusCode !== 404) {
throw err; // ignore not found
}
}
if (existingVersion > TEMPLATE_VERSION) {
// Do not trample a newer version template
this.logger.warning(
`This Kibana instance defines an older template version (${TEMPLATE_VERSION}) than is currently in Elasticsearch (${existingVersion}). ` +
`Because of the potential for non-backwards compatible changes, this Kibana instance will only be able to claim scheduled tasks with ` +
`"kibana.apiVersion" <= ${API_VERSION} in the task metadata.`
);
return;
} else if (existingVersion === TEMPLATE_VERSION) {
// The latest template is already saved, so just log a debug line.
this.logger.debug(
`Not installing ${this.index} index template: version ${TEMPLATE_VERSION} already exists.`
);
return;
}
// Activate template creation / update
if (existingVersion > 0) {
this.logger.info(
`Upgrading ${this.index} index template. Old version: ${existingVersion}, New version: ${TEMPLATE_VERSION}.`
);
} else {
this.logger.info(`Installing ${this.index} index template version: ${TEMPLATE_VERSION}.`);
}
const templateResult = await this.callCluster('indices.putTemplate', {
name: templateName,
body: {
index_patterns: [this.index],
mappings: {
dynamic: false,
properties: {
type: { type: 'keyword' },
task: {
properties: {
taskType: { type: 'keyword' },
scheduledAt: { type: 'date' },
runAt: { type: 'date' },
interval: { type: 'text' },
attempts: { type: 'integer' },
status: { type: 'keyword' },
params: { type: 'text' },
state: { type: 'text' },
user: { type: 'keyword' },
scope: { type: 'keyword' },
},
},
kibana: {
properties: {
apiVersion: { type: 'integer' }, // 1, 2, 3, etc
uuid: { type: 'keyword' }, //
version: { type: 'integer' }, // 7000099, etc
},
},
},
},
settings: {
number_of_shards: 1,
auto_expand_replicas: '0-1',
},
version: TEMPLATE_VERSION,
},
});
this._isInitialized = true;
this.logger.info(
`Installed ${this.index} index template: version ${TEMPLATE_VERSION} (API version ${API_VERSION})`
);
return templateResult;
}
public get isInitialized() {
return this._isInitialized;
}
/**
* Schedules a task.
*
* @param task - The task being scheduled.
*/
public async schedule(taskInstance: TaskInstance): Promise<ConcreteTaskInstance> {
if (!this._isInitialized) {
await this.init();
}
if (!this.supportedTypes.includes(taskInstance.taskType)) {
if (!this.definitions[taskInstance.taskType]) {
throw new Error(
`Unsupported task type "${
taskInstance.taskType
}". Supported types are ${this.supportedTypes.join(', ')}`
`Unsupported task type "${taskInstance.taskType}". Supported types are ${Object.keys(
this.definitions
).join(', ')}`
);
}
const { id, ...body } = rawSource(taskInstance, this);
const result = await this.callCluster('index', {
id,
body,
index: this.index,
refresh: true,
});
const savedObject = await this.savedObjectsRepository.create(
'task',
taskInstanceToAttributes(taskInstance),
{ id: taskInstance.id }
);
const { task } = body;
return {
...taskInstance,
id: result._id,
sequenceNumber: result._seq_no,
primaryTerm: result._primary_term,
attempts: 0,
status: task.status,
scheduledAt: task.scheduledAt,
runAt: task.runAt,
state: taskInstance.state || {},
};
return savedObjectToConcreteTaskInstance(savedObject);
}
/**
@ -285,15 +127,66 @@ export class TaskStore {
query: {
bool: {
must: [
{ terms: { 'task.taskType': this.supportedTypes } },
{ range: { 'task.attempts': { lte: this.maxAttempts } } },
{ range: { 'task.runAt': { lte: 'now' } } },
{ range: { 'kibana.apiVersion': { lte: API_VERSION } } },
// Either a task with idle status and runAt <= now or
// status running with a retryAt <= now.
{
bool: {
should: [
{
bool: {
must: [
{ term: { 'task.status': 'idle' } },
{ range: { 'task.runAt': { lte: 'now' } } },
],
},
},
{
bool: {
must: [
{ term: { 'task.status': 'running' } },
{ range: { 'task.retryAt': { lte: 'now' } } },
],
},
},
],
},
},
// Either task has an interval or the attempts < the maximum configured
{
bool: {
should: [
{ exists: { field: 'task.interval' } },
...Object.entries(this.definitions).map(([type, definition]) => ({
bool: {
must: [
{ term: { 'task.taskType': type } },
{
range: {
'task.attempts': {
lt: definition.maxAttempts || this.maxAttempts,
},
},
},
],
},
})),
],
},
},
],
},
},
size: 10,
sort: { 'task.runAt': { order: 'asc' } },
sort: {
_script: {
type: 'number',
order: 'asc',
script: {
lang: 'expression',
source: `doc['task.retryAt'].value || doc['task.runAt'].value`,
},
},
},
seq_no_primary_term: true,
});
@ -308,26 +201,14 @@ export class TaskStore {
* @returns {Promise<TaskDoc>}
*/
public async update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance> {
const rawDoc = taskDocToRaw(doc, this);
const updatedSavedObject = await this.savedObjectsRepository.update(
'task',
doc.id,
taskInstanceToAttributes(doc),
{ version: doc.version }
);
const result = await this.callCluster('update', {
body: {
doc: rawDoc._source,
},
id: doc.id,
index: this.index,
if_seq_no: doc.sequenceNumber,
if_primary_term: doc.primaryTerm,
// The refresh is important so that if we immediately look for work,
// we don't pick up this task.
refresh: true,
});
return {
...doc,
sequenceNumber: result._seq_no,
primaryTerm: result._primary_term,
};
return savedObjectToConcreteTaskInstance(updatedSavedObject);
}
/**
@ -336,22 +217,8 @@ export class TaskStore {
* @param {string} id
* @returns {Promise<void>}
*/
public async remove(id: string): Promise<RemoveResult> {
const result = await this.callCluster('delete', {
id,
index: this.index,
// The refresh is important so that if we immediately look for work,
// we don't pick up this task.
refresh: true,
});
return {
index: result._index,
id: result._id,
sequenceNumber: result._seq_no,
primaryTerm: result._primary_term,
result: result.result,
};
public async remove(id: string): Promise<void> {
await this.savedObjectsRepository.delete('task', id);
}
private async search(opts: any = {}): Promise<FetchResult> {
@ -362,7 +229,7 @@ export class TaskStore {
: queryOnlyTasks;
const result = await this.callCluster('search', {
index: this.index,
index: TASK_MANAGER_INDEX,
ignoreUnavailable: true,
body: {
...opts,
@ -373,7 +240,10 @@ export class TaskStore {
const rawDocs = result.hits.hits;
return {
docs: (rawDocs as RawTaskDoc[]).map(rawToTaskDoc),
docs: (rawDocs as SavedObjectsRawDoc[])
.map(doc => this.serializer.rawToSavedObject(doc))
.map(doc => omit(doc, 'namespace') as SavedObject)
.map(savedObjectToConcreteTaskInstance),
searchAfter: (rawDocs.length && rawDocs[rawDocs.length - 1].sort) || [],
};
}
@ -393,62 +263,38 @@ function paginatableSort(sort: any[] = []) {
return [...sort, sortById];
}
function rawSource(doc: TaskInstance, store: TaskStore) {
const { id, ...taskFields } = doc;
const source = {
...taskFields,
function taskInstanceToAttributes(doc: TaskInstance): SavedObjectAttributes {
return {
...omit(doc, 'id', 'version'),
params: JSON.stringify(doc.params || {}),
state: JSON.stringify(doc.state || {}),
attempts: (doc as ConcreteTaskInstance).attempts || 0,
scheduledAt: doc.scheduledAt || new Date(),
runAt: doc.runAt || new Date(),
scheduledAt: (doc.scheduledAt || new Date()).toISOString(),
startedAt: (doc.startedAt && doc.startedAt.toISOString()) || null,
retryAt: (doc.retryAt && doc.retryAt.toISOString()) || null,
runAt: (doc.runAt || new Date()).toISOString(),
status: (doc as ConcreteTaskInstance).status || 'idle',
};
}
delete (source as any).id;
delete (source as any).sequenceNumber;
delete (source as any).primaryTerm;
delete (source as any).type;
function savedObjectToConcreteTaskInstance(savedObject: SavedObject): ConcreteTaskInstance {
return {
id,
type: 'task',
task: source,
kibana: {
uuid: store.getKibanaUuid(), // needs to be pulled live
version: TEMPLATE_VERSION,
apiVersion: API_VERSION,
},
...savedObject.attributes,
id: savedObject.id,
version: savedObject.version,
scheduledAt: new Date(savedObject.attributes.scheduledAt),
runAt: new Date(savedObject.attributes.runAt),
startedAt: savedObject.attributes.startedAt && new Date(savedObject.attributes.startedAt),
retryAt: savedObject.attributes.retryAt && new Date(savedObject.attributes.retryAt),
state: parseJSONField(savedObject.attributes.state, 'state', savedObject.id),
params: parseJSONField(savedObject.attributes.params, 'params', savedObject.id),
};
}
function taskDocToRaw(doc: ConcreteTaskInstance, store: TaskStore): RawTaskDoc {
const { type, task, kibana } = rawSource(doc, store);
return {
_id: doc.id,
_index: store.index,
_source: { type, task, kibana },
_seq_no: doc.sequenceNumber,
_primary_term: doc.primaryTerm,
};
}
function rawToTaskDoc(doc: RawTaskDoc): ConcreteTaskInstance {
return {
...doc._source.task,
id: doc._id,
sequenceNumber: doc._seq_no,
primaryTerm: doc._primary_term,
params: parseJSONField(doc._source.task.params, 'params', doc),
state: parseJSONField(doc._source.task.state, 'state', doc),
};
}
function parseJSONField(json: string, fieldName: string, doc: RawTaskDoc) {
function parseJSONField(json: string, fieldName: string, id: string) {
try {
return json ? JSON.parse(json) : {};
} catch (error) {
throw new Error(`Task "${doc._id}"'s ${fieldName} field has invalid JSON: ${json}`);
throw new Error(`Task "${id}"'s ${fieldName} field has invalid JSON: ${json}`);
}
}

View file

@ -33,7 +33,7 @@ export default function createAlertTests({ getService }: KibanaFunctionalTestDef
async function getScheduledTask(id: string) {
return await es.get({
id,
id: `task:${id}`,
index: '.kibana_task_manager',
});
}

View file

@ -106,7 +106,7 @@ export default function ({ getService }) {
const [scheduledTask] = (await currentTasks()).docs;
expect(scheduledTask.id).to.eql(task.id);
expect(scheduledTask.attempts).to.be.greaterThan(0);
expect(Date.parse(scheduledTask.runAt)).to.be.greaterThan(Date.parse(task.runAt));
expect(Date.parse(scheduledTask.runAt)).to.be.greaterThan(Date.parse(task.runAt) + 5 * 60 * 1000);
});
});