Add support to Task Manager for validating and versioning the task state objects (#159048)

Part of https://github.com/elastic/kibana/issues/155764.

In this PR, I'm modifying task manager to allow task types to report a
versioned schema for the `state` object. When defining
`stateSchemaByVersion`, the following will happen:
- The `state` returned from the task runner will get validated against
the latest version and throw an error if ever it is invalid (to capture
mismatches at development and testing time)
- When task manager reads a task, it will migrate the task state to the
latest version (if necessary) and validate against the latest schema,
dropping any unknown fields (in the scenario of a downgrade).

By default, Task Manager will validate the state on write once a
versioned schema is provided, however the following config must be
enabled for errors to be thrown on read:
`xpack.task_manager.allow_reading_invalid_state: true`. We plan to
enable this in serverless by default but be cautious on existing
deployments and wait for telemetry to show no issues.

I've onboarded the `alerts_invalidate_api_keys` task type which can be
used as an example to onboard others. See [this
commit](214bae38d8).

### How to configure a task type to version and validate
The structure is defined as:
```
taskManager.registerTaskDefinitions({
  ...
  stateSchemaByVersion: {
    1: {
      // All existing tasks don't have a version so will get `up` migrated to 1
      up: (state: Record<string, unknown>) => ({
        runs: state.runs || 0,
        total_invalidated: state.total_invalidated || 0,
      }),
      schema: schema.object({
        runs: schema.number(),
        total_invalidated: schema.number(),
      }),
    },
  },
  ...
});
```

However, look at [this
commit](214bae38d8)
for an example that you can leverage type safety from the schema.

### Follow up issues
- Onboard non-alerting task types to have a versioned state schema
(https://github.com/elastic/kibana/issues/159342)
- Onboard alerting task types to have a versioned state schema for the
framework fields (https://github.com/elastic/kibana/issues/159343)
- Onboard alerting task types to have a versioned rule and alert state
schema within the task state
(https://github.com/elastic/kibana/issues/159344)
- Telemetry on the validation failures
(https://github.com/elastic/kibana/issues/159345)
- Remove feature flag so `allow_reading_invalid_state` is always `false`
(https://github.com/elastic/kibana/issues/159346)
- Force validation on all tasks using state by removing the exemption
code (https://github.com/elastic/kibana/issues/159347)
- Release tasks when encountering a validation failure after run
(https://github.com/elastic/kibana/issues/159964)

### To Verify

NOTE: I have the following verification scenarios in a jest integration
test as well =>
https://github.com/elastic/kibana/pull/159048/files#diff-5f06228df58fa74d5a0f2722c30f1f4bee2ee9df7a14e0700b9aa9bc3864a858.

You will need to log the state when the task runs to observe what the
task runner receives in different scenarios.

```
diff --git a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
index 1e624bcd807..4aa4c2c7805 100644
--- a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
+++ b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
@@ -140,6 +140,7 @@ function taskRunner(
 ) {
   return ({ taskInstance }: RunContext) => {
     const state = taskInstance.state as LatestTaskStateSchema;
+    console.log('*** Running task with the following state:', JSON.stringify(state));
     return {
       async run() {
         let totalInvalidated = 0;
```

#### Scenario 1: Adding an unknown field to the task saved-object gets
dropped
1. Startup a fresh Kibana instance
2. Make the following call to Elasticsearch (I used postman). This call
adds an unknown property (`foo`) to the task state and makes the task
run right away.
```
POST http://kibana_system:changeme@localhost:9200/.kibana_task_manager/_update/task:Alerts-alerts_invalidate_api_keys
{
  "doc": {
    "task": {
      "runAt": "2023-06-08T00:00:00.000Z",
      "state": "{\"runs\":1,\"total_invalidated\":0,\"foo\":true}"
    }
  }
}
```
3. Observe the task run log message, with state not containing `foo`.

#### Scenario 2: Task running returning an unknown property causes the
task to fail to update
1. Apply the following changes to the code (and ignore TypeScript
issues)
```
diff --git a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
index 1e624bcd807..b15d4a4f478 100644
--- a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
+++ b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
@@ -183,6 +183,7 @@ function taskRunner(
 
           const updatedState: LatestTaskStateSchema = {
             runs: (state.runs || 0) + 1,
+            foo: true,
             total_invalidated: totalInvalidated,
           };
           return {
```
2. Make the task run right away by calling Elasticsearch with the
following
```
POST http://kibana_system:changeme@localhost:9200/.kibana_task_manager/_update/task:Alerts-alerts_invalidate_api_keys
{
  "doc": {
    "task": {
      "runAt": "2023-06-08T00:00:00.000Z"
    }
  }
}
```
3. Notice the validation errors logged as debug
```
[ERROR][plugins.taskManager] Task alerts_invalidate_api_keys "Alerts-alerts_invalidate_api_keys" failed: Error: [foo]: definition for this key is missing
```

#### Scenario 3: Task state gets migrated
1. Apply the following code change
```
diff --git a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
index 1e624bcd807..338f21bed5b 100644
--- a/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
+++ b/x-pack/plugins/alerting/server/invalidate_pending_api_keys/task.ts
@@ -41,6 +41,18 @@ const stateSchemaByVersion = {
       total_invalidated: schema.number(),
     }),
   },
+  2: {
+    up: (state: Record<string, unknown>) => ({
+      runs: state.runs,
+      total_invalidated: state.total_invalidated,
+      foo: true,
+    }),
+    schema: schema.object({
+      runs: schema.number(),
+      total_invalidated: schema.number(),
+      foo: schema.boolean(),
+    }),
+  },
 };
 
 const latestSchema = stateSchemaByVersion[1].schema;
```

2. Make the task run right away by calling Elasticsearch with the
following
```
POST http://kibana_system:changeme@localhost:9200/.kibana_task_manager/_update/task:Alerts-alerts_invalidate_api_keys
{
  "doc": {
    "task": {
      "runAt": "2023-06-08T00:00:00.000Z"
    }
  }
}
```
3. Observe the state now contains `foo` property when the task runs.

#### Scenario 4: Reading invalid state causes debug logs
1. Run the following request to Elasticsearch
```
POST http://kibana_system:changeme@localhost:9200/.kibana_task_manager/_update/task:Alerts-alerts_invalidate_api_keys
{
  "doc": {
    "task": {
      "runAt": "2023-06-08T00:00:00.000Z",
      "state": "{}"
    }
  }
}
```
2. Observe the Kibana debug log mentioning the validation failure while
letting the task through
```
[DEBUG][plugins.taskManager] [alerts_invalidate_api_keys][Alerts-alerts_invalidate_api_keys] Failed to validate the task's state. Allowing read operation to proceed because allow_reading_invalid_state is true. Error: [runs]: expected value of type [number] but got [undefined]
```

#### Scenario 5: Reading invalid state when setting
`allow_reading_invalid_state: false` causes tasks to fail to run
1. Set `xpack.task_manager.allow_reading_invalid_state: false` in your
kibana.yml settings
2. Run the following request to Elasticsearch
```
POST http://kibana_system:changeme@localhost:9200/.kibana_task_manager/_update/task:Alerts-alerts_invalidate_api_keys
{
  "doc": {
    "task": {
      "runAt": "2023-06-08T00:00:00.000Z",
      "state": "{}"
    }
  }
}
```
3. Observe the Kibana error log mentioning the validation failure
```
[ERROR][plugins.taskManager] Failed to poll for work: Error: [runs]: expected value of type [number] but got [undefined]
```

NOTE: While corrupting the task directly is rare, we plan to re-queue
the tasks that failed to read, leveraging work from
https://github.com/elastic/kibana/issues/159302 in a future PR (hence
why the yml config is enabled by default, allowing invalid reads).

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Ying Mao <ying.mao@elastic.co>
This commit is contained in:
Mike Côté 2023-06-23 10:41:55 -04:00 committed by GitHub
parent 498e8a625b
commit 40c2afdc58
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 1595 additions and 131 deletions

View file

@ -12,6 +12,7 @@ import {
KibanaRequest,
SavedObjectsClientContract,
} from '@kbn/core/server';
import { schema, TypeOf } from '@kbn/config-schema';
import { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-plugin/server';
import { InvalidateAPIKeysParams, SecurityPluginStart } from '@kbn/security-plugin/server';
import {
@ -29,6 +30,22 @@ import { InvalidatePendingApiKey } from '../types';
const TASK_TYPE = 'alerts_invalidate_api_keys';
export const TASK_ID = `Alerts-${TASK_TYPE}`;
const stateSchemaByVersion = {
1: {
up: (state: Record<string, unknown>) => ({
runs: state.runs || 0,
total_invalidated: state.total_invalidated || 0,
}),
schema: schema.object({
runs: schema.number(),
total_invalidated: schema.number(),
}),
},
};
const latestSchema = stateSchemaByVersion[1].schema;
type LatestTaskStateSchema = TypeOf<typeof latestSchema>;
const invalidateAPIKeys = async (
params: InvalidateAPIKeysParams,
securityPluginStart?: SecurityPluginStart
@ -65,17 +82,21 @@ export async function scheduleApiKeyInvalidatorTask(
) {
const interval = config.invalidateApiKeysTask.interval;
try {
const state: LatestTaskStateSchema = {
runs: 0,
total_invalidated: 0,
};
await taskManager.ensureScheduled({
id: TASK_ID,
taskType: TASK_TYPE,
schedule: {
interval,
},
state: {},
state,
params: {},
});
} catch (e) {
logger.debug(`Error scheduling task, received ${e.message}`);
logger.error(`Error scheduling ${TASK_ID} task, received ${e.message}`);
}
}
@ -88,6 +109,7 @@ function registerApiKeyInvalidatorTaskDefinition(
taskManager.registerTaskDefinitions({
[TASK_TYPE]: {
title: 'Invalidate alert API Keys',
stateSchemaByVersion,
createTaskRunner: taskRunner(logger, coreStartServices, config),
},
});
@ -117,7 +139,7 @@ function taskRunner(
config: AlertingConfig
) {
return ({ taskInstance }: RunContext) => {
const { state } = taskInstance;
const state = taskInstance.state as LatestTaskStateSchema;
return {
async run() {
let totalInvalidated = 0;
@ -159,22 +181,24 @@ function taskRunner(
hasApiKeysPendingInvalidation = apiKeysToInvalidate.total > PAGE_SIZE;
} while (hasApiKeysPendingInvalidation);
const updatedState: LatestTaskStateSchema = {
runs: (state.runs || 0) + 1,
total_invalidated: totalInvalidated,
};
return {
state: {
runs: (state.runs || 0) + 1,
total_invalidated: totalInvalidated,
},
state: updatedState,
schedule: {
interval: config.invalidateApiKeysTask.interval,
},
};
} catch (e) {
logger.warn(`Error executing alerting apiKey invalidation task: ${e.message}`);
const updatedState: LatestTaskStateSchema = {
runs: state.runs + 1,
total_invalidated: totalInvalidated,
};
return {
state: {
runs: (state.runs || 0) + 1,
total_invalidated: totalInvalidated,
},
state: updatedState,
schedule: {
interval: config.invalidateApiKeysTask.interval,
},

View file

@ -92,6 +92,20 @@ export class Plugin {
// can add significant load to the ES cluster, so please use this configuration only when absolutly necesery.
maxConcurrency: 1,
// To ensure the validity of task state during read and write operations, utilize the stateSchemaByVersion configuration. This functionality validates the state before executing a task. Make sure to define the schema property using the @kbn/config-schema plugin, specifically as an ObjectType (schema.object) at the top level.
stateSchemaByVersion: {
1: {
schema: schema.object({
count: schema.number(),
}),
up: (state) => {
return {
count: state.count || 0,
};
},
}
}
// The createTaskRunner function / method returns an object that is responsible for
// performing the work of the task. context: { taskInstance }, is documented below.
createTaskRunner(context) {

View file

@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { PublicMethodsOf } from '@kbn/utility-types';
import { BufferedTaskStore } from './buffered_task_store';
const createBufferedTaskStoreMock = () => {
const mocked: jest.Mocked<PublicMethodsOf<BufferedTaskStore>> = {
update: jest.fn(),
remove: jest.fn(),
};
return mocked;
};
export const bufferedTaskStoreMock = {
create: createBufferedTaskStoreMock,
};

View file

@ -29,8 +29,36 @@ describe('Buffered Task Store', () => {
taskStore.bulkUpdate.mockResolvedValue([asOk(task)]);
expect(await bufferedStore.update(task)).toMatchObject(task);
expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task]);
expect(await bufferedStore.update(task, { validate: true })).toMatchObject(task);
expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task], { validate: false });
expect(taskStore.taskValidator.getValidatedTaskInstanceForUpdating).toHaveBeenCalledTimes(1);
expect(taskStore.taskValidator.getValidatedTaskInstanceFromReading).toHaveBeenCalledTimes(1);
expect(taskStore.taskValidator.getValidatedTaskInstanceForUpdating).toHaveBeenCalledWith(
task,
{ validate: true }
);
expect(taskStore.taskValidator.getValidatedTaskInstanceFromReading).toHaveBeenCalledWith(
task,
{ validate: true }
);
});
test(`doesn't validate when specified`, async () => {
const taskStore = taskStoreMock.create();
const bufferedStore = new BufferedTaskStore(taskStore, {});
const task = taskManagerMock.createTask();
taskStore.bulkUpdate.mockResolvedValue([asOk(task)]);
expect(await bufferedStore.update(task, { validate: false })).toMatchObject(task);
expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task], { validate: false });
expect(taskStore.taskValidator.getValidatedTaskInstanceForUpdating).toHaveBeenCalledWith(
task,
{ validate: false }
);
});
test('handles partially successfull bulkUpdates resolving each call appropriately', async () => {
@ -58,9 +86,9 @@ describe('Buffered Task Store', () => {
]);
const results = [
bufferedStore.update(tasks[0]),
bufferedStore.update(tasks[1]),
bufferedStore.update(tasks[2]),
bufferedStore.update(tasks[0], { validate: true }),
bufferedStore.update(tasks[1], { validate: true }),
bufferedStore.update(tasks[2], { validate: true }),
];
expect(await results[0]).toMatchObject(tasks[0]);
expect(results[1]).rejects.toMatchInlineSnapshot(`
@ -105,10 +133,10 @@ describe('Buffered Task Store', () => {
]);
const results = [
bufferedStore.update(tasks[0]),
bufferedStore.update(tasks[1]),
bufferedStore.update(tasks[2]),
bufferedStore.update(tasks[3]),
bufferedStore.update(tasks[0], { validate: true }),
bufferedStore.update(tasks[1], { validate: true }),
bufferedStore.update(tasks[2], { validate: true }),
bufferedStore.update(tasks[3], { validate: true }),
];
expect(await results[0]).toMatchObject(tasks[0]);
expect(results[1]).rejects.toMatchInlineSnapshot(`

View file

@ -17,14 +17,31 @@ const DEFAULT_BUFFER_MAX_DURATION = 50;
export class BufferedTaskStore implements Updatable {
private bufferedUpdate: Operation<ConcreteTaskInstance>;
constructor(private readonly taskStore: TaskStore, options: BufferOptions) {
this.bufferedUpdate = createBuffer<ConcreteTaskInstance>((docs) => taskStore.bulkUpdate(docs), {
bufferMaxDuration: DEFAULT_BUFFER_MAX_DURATION,
...options,
});
this.bufferedUpdate = createBuffer<ConcreteTaskInstance>(
// Setting validate: false because we'll validate per update call
//
// Ideally we could accumulate the "validate" options and pass them
// to .bulkUpdate per doc, but the required changes to the bulk_operation_buffer
// to track the values are high and deffered for now.
(docs) => taskStore.bulkUpdate(docs, { validate: false }),
{
bufferMaxDuration: DEFAULT_BUFFER_MAX_DURATION,
...options,
}
);
}
public async update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance> {
return unwrapPromise(this.bufferedUpdate(doc));
public async update(
doc: ConcreteTaskInstance,
options: { validate: boolean }
): Promise<ConcreteTaskInstance> {
const docToUpdate = this.taskStore.taskValidator.getValidatedTaskInstanceForUpdating(doc, {
validate: options.validate,
});
const result = await unwrapPromise(this.bufferedUpdate(docToUpdate));
return this.taskStore.taskValidator.getValidatedTaskInstanceFromReading(result, {
validate: options.validate,
});
}
public async remove(id: string): Promise<void> {

View file

@ -12,6 +12,7 @@ describe('config validation', () => {
const config: Record<string, unknown> = {};
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"allow_reading_invalid_state": true,
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
@ -64,6 +65,7 @@ describe('config validation', () => {
const config: Record<string, unknown> = {};
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"allow_reading_invalid_state": true,
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
@ -114,6 +116,7 @@ describe('config validation', () => {
};
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"allow_reading_invalid_state": true,
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,

View file

@ -137,6 +137,7 @@ export const configSchema = schema.object(
exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }),
authenticate_background_task_utilization: schema.boolean({ defaultValue: true }),
}),
allow_reading_invalid_state: schema.boolean({ defaultValue: true }),
},
{
validate: (config) => {

View file

@ -50,6 +50,7 @@ describe('EphemeralTaskLifecycle', () => {
poll_interval: 6000000,
version_conflict_threshold: 80,
request_capacity: 1000,
allow_reading_invalid_state: false,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_required_freshness: 5000,
monitored_stats_running_average_window: 50,

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { injectTask } from './inject_task';
export { setupTestServers } from './setup_test_servers';
export { retry } from './retry';

View file

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { type ElasticsearchClient } from '@kbn/core/server';
import { type ConcreteTaskInstance } from '../../task';
export async function injectTask(
esClient: ElasticsearchClient,
{ id, ...task }: ConcreteTaskInstance
) {
const soId = `task:${id}`;
await esClient.index({
id: soId,
index: '.kibana_task_manager',
document: {
references: [],
type: 'task',
updated_at: new Date().toISOString(),
task: {
...task,
state: JSON.stringify(task.state),
params: JSON.stringify(task.params),
runAt: task.runAt.toISOString(),
scheduledAt: task.scheduledAt.toISOString(),
},
},
});
}

View file

@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
interface RetryOpts {
times: number;
intervalMs: number;
}
export async function retry<T>(
cb: () => Promise<T>,
options: RetryOpts = { times: 60, intervalMs: 500 }
) {
let attempt = 1;
while (true) {
try {
return await cb();
} catch (e) {
if (attempt >= options.times) {
throw e;
}
}
attempt++;
await new Promise((resolve) => setTimeout(resolve, options.intervalMs));
}
}

View file

@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import deepmerge from 'deepmerge';
import { createTestServers, createRootWithCorePlugins } from '@kbn/core-test-helpers-kbn-server';
export async function setupTestServers(settings = {}) {
const { startES } = createTestServers({
adjustTimeout: (t) => jest.setTimeout(t),
settings: {
es: {
license: 'trial',
},
},
});
const esServer = await startES();
const root = createRootWithCorePlugins(
deepmerge(
{
logging: {
root: {
level: 'warn',
},
loggers: [
{
name: 'plugins.taskManager',
level: 'all',
},
],
},
},
settings
),
{ oss: false }
);
await root.preboot();
const coreSetup = await root.setup();
const coreStart = await root.start();
return {
esServer,
kibanaServer: {
root,
coreSetup,
coreStart,
stop: async () => await root.shutdown(),
},
};
}

View file

@ -43,6 +43,7 @@ describe('managed configuration', () => {
max_workers: 10,
max_attempts: 9,
poll_interval: 3000,
allow_reading_invalid_state: false,
version_conflict_threshold: 80,
monitored_aggregated_stats_refresh_rate: 60000,
monitored_stats_health_verbose_log: {

View file

@ -0,0 +1,326 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
type TestElasticsearchUtils,
type TestKibanaUtils,
} from '@kbn/core-test-helpers-kbn-server';
import { schema } from '@kbn/config-schema';
import { TaskStatus } from '../task';
import { type TaskPollingLifecycleOpts } from '../polling_lifecycle';
import { type TaskClaimingOpts } from '../queries/task_claiming';
import { TaskManagerPlugin, type TaskManagerStartContract } from '../plugin';
import { injectTask, setupTestServers, retry } from './lib';
const { TaskPollingLifecycle: TaskPollingLifecycleMock } = jest.requireMock('../polling_lifecycle');
jest.mock('../polling_lifecycle', () => {
const actual = jest.requireActual('../polling_lifecycle');
return {
...actual,
TaskPollingLifecycle: jest.fn().mockImplementation((opts) => {
return new actual.TaskPollingLifecycle(opts);
}),
};
});
const mockTaskTypeRunFn = jest.fn();
const mockCreateTaskRunner = jest.fn();
const mockTaskType = {
title: '',
description: '',
stateSchemaByVersion: {
1: {
up: (state: Record<string, unknown>) => ({ foo: state.foo || '' }),
schema: schema.object({
foo: schema.string(),
}),
},
2: {
up: (state: Record<string, unknown>) => ({ ...state, bar: state.bar || '' }),
schema: schema.object({
foo: schema.string(),
bar: schema.string(),
}),
},
3: {
up: (state: Record<string, unknown>) => ({ ...state, baz: state.baz || '' }),
schema: schema.object({
foo: schema.string(),
bar: schema.string(),
baz: schema.string(),
}),
},
},
createTaskRunner: mockCreateTaskRunner.mockImplementation(() => ({
run: mockTaskTypeRunFn,
})),
};
jest.mock('../queries/task_claiming', () => {
const actual = jest.requireActual('../queries/task_claiming');
return {
...actual,
TaskClaiming: jest.fn().mockImplementation((opts: TaskClaimingOpts) => {
// We need to register here because once the class is instantiated, adding
// definitions won't get claimed because of "partitionIntoClaimingBatches".
opts.definitions.registerTaskDefinitions({
fooType: mockTaskType,
});
return new actual.TaskClaiming(opts);
}),
};
});
const taskManagerStartSpy = jest.spyOn(TaskManagerPlugin.prototype, 'start');
describe('task state validation', () => {
describe('allow_reading_invalid_state: true', () => {
let esServer: TestElasticsearchUtils;
let kibanaServer: TestKibanaUtils;
let taskManagerPlugin: TaskManagerStartContract;
let pollingLifecycleOpts: TaskPollingLifecycleOpts;
beforeAll(async () => {
const setupResult = await setupTestServers();
esServer = setupResult.esServer;
kibanaServer = setupResult.kibanaServer;
expect(taskManagerStartSpy).toHaveBeenCalledTimes(1);
taskManagerPlugin = taskManagerStartSpy.mock.results[0].value;
expect(TaskPollingLifecycleMock).toHaveBeenCalledTimes(1);
pollingLifecycleOpts = TaskPollingLifecycleMock.mock.calls[0][0];
});
afterAll(async () => {
if (kibanaServer) {
await kibanaServer.stop();
}
if (esServer) {
await esServer.stop();
}
});
beforeEach(() => {
jest.clearAllMocks();
});
afterEach(async () => {
await taskManagerPlugin.removeIfExists('foo');
});
it('should drop unknown fields from the task state', async () => {
const taskRunnerPromise = new Promise((resolve) => {
mockTaskTypeRunFn.mockImplementation(() => {
setTimeout(resolve, 0);
return { state: {} };
});
});
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: 'foo',
taskType: 'fooType',
params: { foo: true },
state: { foo: 'test', bar: 'test', baz: 'test', invalidProperty: 'invalid' },
stateVersion: 4,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});
await taskRunnerPromise;
expect(mockCreateTaskRunner).toHaveBeenCalledTimes(1);
const call = mockCreateTaskRunner.mock.calls[0][0];
expect(call.taskInstance.state).toEqual({
foo: 'test',
bar: 'test',
baz: 'test',
});
});
it('should fail to update the task if the task runner returns an unknown property in the state', async () => {
const errorLogSpy = jest.spyOn(pollingLifecycleOpts.logger, 'error');
const taskRunnerPromise = new Promise((resolve) => {
mockTaskTypeRunFn.mockImplementation(() => {
setTimeout(resolve, 0);
return { state: { invalidField: true, foo: 'test', bar: 'test', baz: 'test' } };
});
});
await taskManagerPlugin.schedule({
id: 'foo',
taskType: 'fooType',
params: {},
state: { foo: 'test', bar: 'test', baz: 'test' },
schedule: { interval: '1d' },
});
await taskRunnerPromise;
expect(mockCreateTaskRunner).toHaveBeenCalledTimes(1);
const call = mockCreateTaskRunner.mock.calls[0][0];
expect(call.taskInstance.state).toEqual({
foo: 'test',
bar: 'test',
baz: 'test',
});
expect(errorLogSpy).toHaveBeenCalledWith(
'Task fooType "foo" failed: Error: [invalidField]: definition for this key is missing',
expect.anything()
);
});
it('should migrate the task state', async () => {
const taskRunnerPromise = new Promise((resolve) => {
mockTaskTypeRunFn.mockImplementation(() => {
setTimeout(resolve, 0);
return { state: {} };
});
});
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: 'foo',
taskType: 'fooType',
params: { foo: true },
state: {},
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});
await taskRunnerPromise;
expect(mockCreateTaskRunner).toHaveBeenCalledTimes(1);
const call = mockCreateTaskRunner.mock.calls[0][0];
expect(call.taskInstance.state).toEqual({
foo: '',
bar: '',
baz: '',
});
});
it('should debug log by default when reading an invalid task state', async () => {
const debugLogSpy = jest.spyOn(pollingLifecycleOpts.logger, 'debug');
const taskRunnerPromise = new Promise((resolve) => {
mockTaskTypeRunFn.mockImplementation(() => {
setTimeout(resolve, 0);
return { state: {} };
});
});
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: 'foo',
taskType: 'fooType',
params: { foo: true },
state: { foo: true, bar: 'test', baz: 'test' },
stateVersion: 4,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});
await taskRunnerPromise;
expect(mockCreateTaskRunner).toHaveBeenCalledTimes(1);
const call = mockCreateTaskRunner.mock.calls[0][0];
expect(call.taskInstance.state).toEqual({
foo: true,
bar: 'test',
baz: 'test',
});
expect(debugLogSpy).toHaveBeenCalledWith(
`[fooType][foo] Failed to validate the task's state. Allowing read operation to proceed because allow_reading_invalid_state is true. Error: [foo]: expected value of type [string] but got [boolean]`
);
});
});
describe('allow_reading_invalid_state: false', () => {
let esServer: TestElasticsearchUtils;
let kibanaServer: TestKibanaUtils;
let taskManagerPlugin: TaskManagerStartContract;
let pollingLifecycleOpts: TaskPollingLifecycleOpts;
beforeAll(async () => {
const setupResult = await setupTestServers({
xpack: {
task_manager: {
allow_reading_invalid_state: false,
},
},
});
esServer = setupResult.esServer;
kibanaServer = setupResult.kibanaServer;
expect(taskManagerStartSpy).toHaveBeenCalledTimes(1);
taskManagerPlugin = taskManagerStartSpy.mock.results[0].value;
expect(TaskPollingLifecycleMock).toHaveBeenCalledTimes(1);
pollingLifecycleOpts = TaskPollingLifecycleMock.mock.calls[0][0];
});
afterAll(async () => {
if (kibanaServer) {
await kibanaServer.stop();
}
if (esServer) {
await esServer.stop();
}
});
beforeEach(() => {
jest.clearAllMocks();
});
afterEach(async () => {
await taskManagerPlugin.removeIfExists('foo');
});
it('should fail the task run when setting allow_reading_invalid_state:false and reading an invalid state', async () => {
const errorLogSpy = jest.spyOn(pollingLifecycleOpts.logger, 'error');
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: 'foo',
taskType: 'fooType',
params: { foo: true },
state: { foo: true, bar: 'test', baz: 'test' },
stateVersion: 4,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});
await retry(async () => {
expect(errorLogSpy).toHaveBeenCalledWith(
`Failed to poll for work: Error: [foo]: expected value of type [string] but got [boolean]`
);
});
expect(mockCreateTaskRunner).not.toHaveBeenCalled();
});
});
});

View file

@ -44,7 +44,7 @@ describe('Bulk Operation Buffer', () => {
return Promise.resolve([incrementAttempts(task1), incrementAttempts(task2)]);
});
const bufferedUpdate = createBuffer(bulkUpdate);
const bufferedUpdate = createBuffer(bulkUpdate, {});
const task1 = createTask();
const task2 = createTask();
@ -173,7 +173,7 @@ describe('Bulk Operation Buffer', () => {
}
);
const bufferedUpdate = createBuffer(bulkUpdate);
const bufferedUpdate = createBuffer(bulkUpdate, {});
const task1 = createTask();
const task2 = createTask();
@ -195,7 +195,7 @@ describe('Bulk Operation Buffer', () => {
return Promise.reject(new Error('bulkUpdate is an illusion'));
});
const bufferedUpdate = createBuffer(bulkUpdate);
const bufferedUpdate = createBuffer(bulkUpdate, {});
const task1 = createTask();
const task2 = createTask();

View file

@ -39,7 +39,7 @@ const FLUSH = true;
export function createBuffer<T extends Entity>(
bulkOperation: BulkOperation<T>,
{ bufferMaxDuration = 0, bufferMaxOperations = Number.MAX_VALUE, logger }: BufferOptions = {}
{ bufferMaxDuration = 0, bufferMaxOperations = Number.MAX_VALUE, logger }: BufferOptions
): Operation<T> {
const flushBuffer = new Subject<void>();

View file

@ -32,25 +32,26 @@ describe('retryableBulkUpdate()', () => {
});
it('should call getTasks with taskIds', async () => {
await retryableBulkUpdate({ taskIds, getTasks, filter, map, store });
await retryableBulkUpdate({ taskIds, getTasks, filter, map, store, validate: false });
expect(getTasks).toHaveBeenCalledWith(taskIds);
});
it('should filter tasks returned from getTasks', async () => {
filter.mockImplementation((task) => task.id === '2');
await retryableBulkUpdate({ taskIds, getTasks, filter, map, store });
await retryableBulkUpdate({ taskIds, getTasks, filter, map, store, validate: false });
expect(filter).toHaveBeenCalledTimes(3);
// Map happens after filter
expect(map).toHaveBeenCalledTimes(1);
expect(store.bulkUpdate).toHaveBeenCalledWith([tasks[1]]);
expect(store.bulkUpdate).toHaveBeenCalledWith([tasks[1]], { validate: false });
});
it('should map tasks returned from getTasks', async () => {
map.mockImplementation((task) => ({ ...task, status: TaskStatus.Claiming }));
await retryableBulkUpdate({ taskIds, getTasks, filter, map, store });
await retryableBulkUpdate({ taskIds, getTasks, filter, map, store, validate: false });
expect(map).toHaveBeenCalledTimes(3);
expect(store.bulkUpdate).toHaveBeenCalledWith(
tasks.map((task) => ({ ...task, status: TaskStatus.Claiming }))
tasks.map((task) => ({ ...task, status: TaskStatus.Claiming })),
{ validate: false }
);
});
@ -71,9 +72,9 @@ describe('retryableBulkUpdate()', () => {
]);
getTasks.mockResolvedValueOnce([tasks[0]].map((task) => asOk(task)));
store.bulkUpdate.mockResolvedValueOnce(tasks.map((task) => asOk(task)));
await retryableBulkUpdate({ taskIds, getTasks, filter, map, store });
await retryableBulkUpdate({ taskIds, getTasks, filter, map, store, validate: false });
expect(store.bulkUpdate).toHaveBeenCalledTimes(2);
expect(store.bulkUpdate).toHaveBeenNthCalledWith(2, [tasks[0]]);
expect(store.bulkUpdate).toHaveBeenNthCalledWith(2, [tasks[0]], { validate: false });
});
it('should skip updating tasks that cannot be found', async () => {
@ -86,7 +87,7 @@ describe('retryableBulkUpdate()', () => {
}),
asOk(tasks[2]),
]);
await retryableBulkUpdate({ taskIds, getTasks, filter, map, store });
expect(store.bulkUpdate).toHaveBeenCalledWith([tasks[0], tasks[2]]);
await retryableBulkUpdate({ taskIds, getTasks, filter, map, store, validate: false });
expect(store.bulkUpdate).toHaveBeenCalledWith([tasks[0], tasks[2]], { validate: false });
});
});

View file

@ -19,6 +19,7 @@ export interface RetryableBulkUpdateOpts {
filter: (task: ConcreteTaskInstance) => boolean;
map: (task: ConcreteTaskInstance) => ConcreteTaskInstance;
store: TaskStore;
validate: boolean;
}
export async function retryableBulkUpdate({
@ -27,6 +28,7 @@ export async function retryableBulkUpdate({
filter,
map,
store,
validate,
}: RetryableBulkUpdateOpts): Promise<BulkUpdateTaskResult> {
const resultMap: Record<string, BulkUpdateResult> = {};
@ -42,7 +44,7 @@ export async function retryableBulkUpdate({
}, [])
.filter(filter)
.map(map);
const bulkUpdateResult = await store.bulkUpdate(tasksToUpdate);
const bulkUpdateResult = await store.bulkUpdate(tasksToUpdate, { validate });
for (const result of bulkUpdateResult) {
const taskId = getId(result);
resultMap[taskId] = result;

View file

@ -16,6 +16,7 @@ describe('Configuration Statistics Aggregator', () => {
max_workers: 10,
max_attempts: 9,
poll_interval: 6000000,
allow_reading_invalid_state: false,
version_conflict_threshold: 80,
monitored_stats_required_freshness: 6000000,
request_capacity: 1000,

View file

@ -20,6 +20,7 @@ describe('createMonitoringStatsStream', () => {
max_workers: 10,
max_attempts: 9,
poll_interval: 6000000,
allow_reading_invalid_state: false,
version_conflict_threshold: 80,
monitored_stats_required_freshness: 6000000,
request_capacity: 1000,

View file

@ -43,6 +43,7 @@ const pluginInitializerContextParams = {
poll_interval: 3000,
version_conflict_threshold: 80,
request_capacity: 1000,
allow_reading_invalid_state: false,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_health_verbose_log: {
enabled: false,

View file

@ -227,6 +227,8 @@ export class TaskManagerPlugin
definitions: this.definitions,
taskManagerId: `kibana:${this.taskManagerId!}`,
adHocTaskCounter: this.adHocTaskCounter,
allowReadingInvalidState: this.config.allow_reading_invalid_state,
logger: this.logger,
});
const managedConfiguration = createManagedConfiguration({

View file

@ -48,6 +48,7 @@ describe('TaskPollingLifecycle', () => {
poll_interval: 6000000,
version_conflict_threshold: 80,
request_capacity: 1000,
allow_reading_invalid_state: false,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_health_verbose_log: {
enabled: false,

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { schema, TypeOf } from '@kbn/config-schema';
import { schema, TypeOf, ObjectType } from '@kbn/config-schema';
import { Interval, isInterval, parseIntervalAsMillisecond } from './lib/intervals';
import { isErr, tryAsResult } from './lib/result_type';
@ -138,6 +138,15 @@ export const taskDefinitionSchema = schema.object(
min: 0,
})
),
stateSchemaByVersion: schema.maybe(
schema.recordOf(
schema.string(),
schema.object({
schema: schema.any(),
up: schema.any(),
})
)
),
},
{
validate({ timeout }) {
@ -158,6 +167,13 @@ export type TaskDefinition = TypeOf<typeof taskDefinitionSchema> & {
* and an optional cancel function which cancels the task.
*/
createTaskRunner: TaskRunCreatorFunction;
stateSchemaByVersion?: Record<
number,
{
schema: ObjectType;
up: (state: Record<string, unknown>) => Record<string, unknown>;
}
>;
};
export enum TaskStatus {
@ -248,6 +264,7 @@ export interface TaskInstance {
// this can be fixed by supporting generics in the future
// eslint-disable-next-line @typescript-eslint/no-explicit-any
state: Record<string, any>;
stateVersion?: number;
/**
* The serialized traceparent string of the current APM transaction or span.

View file

@ -23,10 +23,10 @@ import moment from 'moment';
import { TaskDefinitionRegistry, TaskTypeDictionary } from '../task_type_dictionary';
import { mockLogger } from '../test_utils';
import { throwRetryableError, throwUnrecoverableError } from './errors';
import { taskStoreMock } from '../task_store.mock';
import apm from 'elastic-apm-node';
import { executionContextServiceMock } from '@kbn/core/server/mocks';
import { usageCountersServiceMock } from '@kbn/usage-collection-plugin/server/usage_counters/usage_counters_service.mock';
import { bufferedTaskStoreMock } from '../buffered_task_store.mock';
import {
calculateDelay,
TASK_MANAGER_RUN_TRANSACTION_TYPE,
@ -432,17 +432,20 @@ describe('TaskManagerRunner', () => {
`[Error: type: Bad Request]`
);
expect(store.update).toHaveBeenCalledWith({
...mockInstance({
id,
attempts: initialAttempts + 1,
schedule: undefined,
}),
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});
expect(store.update).toHaveBeenCalledWith(
{
...mockInstance({
id,
attempts: initialAttempts + 1,
schedule: undefined,
}),
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
},
{ validate: false }
);
});
test(`it doesnt try to increment a task's attempts when markTaskAsRunning fails for version conflict`, async () => {
@ -834,7 +837,9 @@ describe('TaskManagerRunner', () => {
await runner.run();
expect(store.update).toHaveBeenCalledTimes(1);
expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt }));
expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt }), {
validate: true,
});
});
test('reschedules tasks that return a schedule', async () => {
@ -862,7 +867,9 @@ describe('TaskManagerRunner', () => {
await runner.run();
expect(store.update).toHaveBeenCalledTimes(1);
expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt }));
expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt }), {
validate: true,
});
});
test(`doesn't reschedule recurring tasks that throw an unrecoverable error`, async () => {
@ -936,7 +943,9 @@ describe('TaskManagerRunner', () => {
await runner.run();
expect(store.update).toHaveBeenCalledTimes(1);
expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt }));
expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt }), {
validate: true,
});
});
test('removes non-recurring tasks after they complete', async () => {
@ -1654,7 +1663,7 @@ describe('TaskManagerRunner', () => {
const instance = mockInstance(opts.instance);
const store = taskStoreMock.create();
const store = bufferedTaskStoreMock.create();
const usageCounter = usageCountersServiceMock.createSetupContract().createUsageCounter('test');
store.update.mockResolvedValue(instance);

View file

@ -89,7 +89,7 @@ export interface TaskRunning<Stage extends TaskRunningStage, Instance> {
}
export interface Updatable {
update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance>;
update(doc: ConcreteTaskInstance, options: { validate: boolean }): Promise<ConcreteTaskInstance>;
remove(id: string): Promise<void>;
}
@ -395,27 +395,30 @@ export class TaskManagerRunner implements TaskRunner {
}
this.instance = asReadyToRun(
(await this.bufferedTaskStore.update({
...taskWithoutEnabled(taskInstance),
status: TaskStatus.Running,
startedAt: now,
attempts,
retryAt:
(this.instance.task.schedule
? maxIntervalFromDate(
now,
this.instance.task.schedule.interval,
this.definition.timeout
)
: this.getRetryDelay({
attempts,
// Fake an error. This allows retry logic when tasks keep timing out
// and lets us set a proper "retryAt" value each time.
error: new Error('Task timeout'),
addDuration: this.definition.timeout,
})) ?? null,
// This is a safe convertion as we're setting the startAt above
})) as ConcreteTaskInstanceWithStartedAt
(await this.bufferedTaskStore.update(
{
...taskWithoutEnabled(taskInstance),
status: TaskStatus.Running,
startedAt: now,
attempts,
retryAt:
(this.instance.task.schedule
? maxIntervalFromDate(
now,
this.instance.task.schedule.interval,
this.definition.timeout
)
: this.getRetryDelay({
attempts,
// Fake an error. This allows retry logic when tasks keep timing out
// and lets us set a proper "retryAt" value each time.
error: new Error('Task timeout'),
addDuration: this.definition.timeout,
})) ?? null,
// This is a safe convertion as we're setting the startAt above
},
{ validate: false }
)) as ConcreteTaskInstanceWithStartedAt
);
const timeUntilClaimExpiresAfterUpdate =
@ -476,14 +479,17 @@ export class TaskManagerRunner implements TaskRunner {
private async releaseClaimAndIncrementAttempts(): Promise<Result<ConcreteTaskInstance, Error>> {
return promiseResult(
this.bufferedTaskStore.update({
...taskWithoutEnabled(this.instance.task),
status: TaskStatus.Idle,
attempts: this.instance.task.attempts + 1,
startedAt: null,
retryAt: null,
ownerId: null,
})
this.bufferedTaskStore.update(
{
...taskWithoutEnabled(this.instance.task),
status: TaskStatus.Idle,
attempts: this.instance.task.attempts + 1,
startedAt: null,
retryAt: null,
ownerId: null,
},
{ validate: false }
)
);
}
@ -580,7 +586,8 @@ export class TaskManagerRunner implements TaskRunner {
ownerId: null,
},
taskWithoutEnabled(this.instance.task)
)
),
{ validate: true }
)
);
}

View file

@ -538,7 +538,8 @@ describe('TaskScheduling', () => {
status: TaskStatus.Idle,
runAt: expect.any(Date),
scheduledAt: expect.any(Date),
})
}),
{ validate: false }
);
expect(mockTaskStore.get).toHaveBeenCalledWith(id);
expect(result).toEqual({ id });
@ -560,7 +561,8 @@ describe('TaskScheduling', () => {
status: TaskStatus.Idle,
runAt: expect.any(Date),
scheduledAt: expect.any(Date),
})
}),
{ validate: false }
);
expect(mockTaskStore.get).toHaveBeenCalledWith(id);
expect(result).toEqual({ id });

View file

@ -159,6 +159,7 @@ export class TaskScheduling {
getTasks: async (ids) => await this.bulkGetTasksHelper(ids),
filter: (task) => !!task.enabled,
map: (task) => ({ ...task, enabled: false }),
validate: false,
});
}
@ -174,6 +175,7 @@ export class TaskScheduling {
}
return { ...task, enabled: true };
},
validate: false,
});
}
@ -208,6 +210,7 @@ export class TaskScheduling {
return { ...task, schedule, runAt: new Date(newRunAtInMs) };
},
validate: false,
});
}
@ -229,12 +232,15 @@ export class TaskScheduling {
public async runSoon(taskId: string): Promise<RunSoonResult> {
const task = await this.getNonRunningTask(taskId);
try {
await this.store.update({
...task,
status: TaskStatus.Idle,
scheduledAt: new Date(),
runAt: new Date(),
});
await this.store.update(
{
...task,
status: TaskStatus.Idle,
scheduledAt: new Date(),
runAt: new Date(),
},
{ validate: false }
);
} catch (e) {
if (e.statusCode === 409) {
this.logger.debug(

View file

@ -14,6 +14,10 @@ interface TaskStoreOptions {
export const taskStoreMock = {
create({ index = '', taskManagerId = '' }: TaskStoreOptions = {}) {
const mocked = {
taskValidator: {
getValidatedTaskInstanceFromReading: jest.fn().mockImplementation((task) => task),
getValidatedTaskInstanceForUpdating: jest.fn().mockImplementation((task) => task),
},
convertToSavedObjectIds: jest.fn(),
update: jest.fn(),
remove: jest.fn(),

View file

@ -5,6 +5,7 @@
* 2.0.
*/
import { schema } from '@kbn/config-schema';
import { Client } from '@elastic/elasticsearch';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import _ from 'lodash';
@ -25,13 +26,36 @@ import { mockLogger } from './test_utils';
import { AdHocTaskCounter } from './lib/adhoc_task_counter';
import { asErr } from './lib/result_type';
const mockGetValidatedTaskInstanceFromReading = jest.fn();
const mockGetValidatedTaskInstanceForUpdating = jest.fn();
jest.mock('./task_validator', () => {
return {
TaskValidator: jest.fn().mockImplementation(() => {
return {
getValidatedTaskInstanceFromReading: mockGetValidatedTaskInstanceFromReading,
getValidatedTaskInstanceForUpdating: mockGetValidatedTaskInstanceForUpdating,
};
}),
};
});
const savedObjectsClient = savedObjectsRepositoryMock.create();
const serializer = savedObjectsServiceMock.createSerializer();
const adHocTaskCounter = new AdHocTaskCounter();
const randomId = () => `id-${_.random(1, 20)}`;
beforeEach(() => jest.resetAllMocks());
beforeEach(() => {
jest.resetAllMocks();
jest.requireMock('./task_validator').TaskValidator.mockImplementation(() => {
return {
getValidatedTaskInstanceFromReading: mockGetValidatedTaskInstanceFromReading,
getValidatedTaskInstanceForUpdating: mockGetValidatedTaskInstanceForUpdating,
};
});
mockGetValidatedTaskInstanceFromReading.mockImplementation((task) => task);
mockGetValidatedTaskInstanceForUpdating.mockImplementation((task) => task);
});
const mockedDate = new Date('2019-02-12T21:01:22.479Z');
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@ -49,6 +73,14 @@ const taskDefinitions = new TaskTypeDictionary(mockLogger());
taskDefinitions.registerTaskDefinitions({
report: {
title: 'report',
stateSchemaByVersion: {
1: {
schema: schema.object({
foo: schema.string(),
}),
up: (doc) => doc,
},
},
createTaskRunner: jest.fn(),
},
dernstraight: {
@ -67,6 +99,7 @@ describe('TaskStore', () => {
beforeAll(() => {
store = new TaskStore({
logger: mockLogger(),
index: 'tasky',
taskManagerId: '',
serializer,
@ -74,6 +107,7 @@ describe('TaskStore', () => {
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
adHocTaskCounter,
allowReadingInvalidState: false,
});
});
@ -237,6 +271,7 @@ describe('TaskStore', () => {
childEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
esClient.child.mockReturnValue(childEsClient as unknown as Client);
store = new TaskStore({
logger: mockLogger(),
index: 'tasky',
taskManagerId: '',
serializer,
@ -244,6 +279,7 @@ describe('TaskStore', () => {
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
adHocTaskCounter,
allowReadingInvalidState: false,
});
});
@ -306,6 +342,7 @@ describe('TaskStore', () => {
beforeAll(() => {
esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
store = new TaskStore({
logger: mockLogger(),
index: 'tasky',
taskManagerId: '',
serializer,
@ -313,6 +350,7 @@ describe('TaskStore', () => {
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
adHocTaskCounter,
allowReadingInvalidState: false,
});
});
@ -404,6 +442,7 @@ describe('TaskStore', () => {
beforeAll(() => {
esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
store = new TaskStore({
logger: mockLogger(),
index: 'tasky',
taskManagerId: '',
serializer,
@ -411,6 +450,7 @@ describe('TaskStore', () => {
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
adHocTaskCounter,
allowReadingInvalidState: false,
});
});
@ -443,8 +483,16 @@ describe('TaskStore', () => {
}
);
const result = await store.update(task);
const result = await store.update(task, { validate: true });
expect(mockGetValidatedTaskInstanceForUpdating).toHaveBeenCalledTimes(1);
expect(mockGetValidatedTaskInstanceFromReading).toHaveBeenCalledTimes(1);
expect(mockGetValidatedTaskInstanceForUpdating).toHaveBeenCalledWith(task, {
validate: true,
});
expect(mockGetValidatedTaskInstanceFromReading).toHaveBeenCalledWith(task, {
validate: true,
});
expect(savedObjectsClient.update).toHaveBeenCalledWith(
'task',
task.id,
@ -478,6 +526,42 @@ describe('TaskStore', () => {
});
});
test(`doesn't go through validation process to inject stateVersion when validate:false`, async () => {
const task = {
runAt: mockedDate,
scheduledAt: mockedDate,
startedAt: null,
retryAt: null,
id: 'task:324242',
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
attempts: 3,
status: 'idle' as TaskStatus,
version: '123',
ownerId: null,
traceparent: 'myTraceparent',
};
savedObjectsClient.update.mockImplementation(
async (type: string, id: string, attributes: SavedObjectAttributes) => {
return {
id,
type,
attributes,
references: [],
version: '123',
};
}
);
await store.update(task, { validate: false });
expect(mockGetValidatedTaskInstanceForUpdating).toHaveBeenCalledWith(task, {
validate: false,
});
});
test('pushes error from saved objects client to errors$', async () => {
const task = {
runAt: mockedDate,
@ -497,7 +581,9 @@ describe('TaskStore', () => {
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
savedObjectsClient.update.mockRejectedValue(new Error('Failure'));
await expect(store.update(task)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
await expect(
store.update(task, { validate: true })
).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});
});
@ -507,6 +593,7 @@ describe('TaskStore', () => {
beforeAll(() => {
store = new TaskStore({
logger: mockLogger(),
index: 'tasky',
taskManagerId: '',
serializer,
@ -514,6 +601,47 @@ describe('TaskStore', () => {
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
adHocTaskCounter,
allowReadingInvalidState: false,
});
});
test(`doesn't validate whenever validate:false is passed-in`, async () => {
const task = {
runAt: mockedDate,
scheduledAt: mockedDate,
startedAt: null,
retryAt: null,
id: 'task:324242',
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
attempts: 3,
status: 'idle' as TaskStatus,
version: '123',
ownerId: null,
traceparent: '',
};
savedObjectsClient.bulkUpdate.mockResolvedValue({
saved_objects: [
{
id: '324242',
type: 'task',
attributes: {
...task,
state: '{"foo":"bar"}',
params: '{"hello":"world"}',
},
references: [],
version: '123',
},
],
});
await store.bulkUpdate([task], { validate: false });
expect(mockGetValidatedTaskInstanceForUpdating).toHaveBeenCalledWith(task, {
validate: false,
});
});
@ -536,9 +664,9 @@ describe('TaskStore', () => {
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
savedObjectsClient.bulkUpdate.mockRejectedValue(new Error('Failure'));
await expect(store.bulkUpdate([task])).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failure"`
);
await expect(
store.bulkUpdate([task], { validate: true })
).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});
});
@ -548,6 +676,7 @@ describe('TaskStore', () => {
beforeAll(() => {
store = new TaskStore({
logger: mockLogger(),
index: 'tasky',
taskManagerId: '',
serializer,
@ -555,6 +684,7 @@ describe('TaskStore', () => {
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
adHocTaskCounter,
allowReadingInvalidState: false,
});
});
@ -582,6 +712,7 @@ describe('TaskStore', () => {
beforeAll(() => {
store = new TaskStore({
logger: mockLogger(),
index: 'tasky',
taskManagerId: '',
serializer,
@ -589,6 +720,7 @@ describe('TaskStore', () => {
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
adHocTaskCounter,
allowReadingInvalidState: false,
});
});
@ -616,6 +748,7 @@ describe('TaskStore', () => {
beforeAll(() => {
store = new TaskStore({
logger: mockLogger(),
index: 'tasky',
taskManagerId: '',
serializer,
@ -623,6 +756,7 @@ describe('TaskStore', () => {
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
adHocTaskCounter,
allowReadingInvalidState: false,
});
});
@ -635,6 +769,7 @@ describe('TaskStore', () => {
id: randomId(),
params: { hello: 'world' },
state: { foo: 'bar' },
stateVersion: 1,
taskType: 'report',
attempts: 3,
status: 'idle' as TaskStatus,
@ -673,6 +808,7 @@ describe('TaskStore', () => {
beforeAll(() => {
store = new TaskStore({
logger: mockLogger(),
index: 'tasky',
taskManagerId: '',
serializer,
@ -680,6 +816,7 @@ describe('TaskStore', () => {
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
adHocTaskCounter,
allowReadingInvalidState: false,
});
});
@ -745,6 +882,7 @@ describe('TaskStore', () => {
id: randomId(),
params: { hello: 'world' },
state: { foo: 'bar' },
stateVersion: 1,
taskType: 'report',
attempts: 3,
status: status as TaskStatus,
@ -765,6 +903,7 @@ describe('TaskStore', () => {
}));
const store = new TaskStore({
logger: mockLogger(),
index: 'tasky',
taskManagerId: '',
serializer,
@ -772,6 +911,7 @@ describe('TaskStore', () => {
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
adHocTaskCounter,
allowReadingInvalidState: false,
});
expect(await store.getLifecycle(task.id)).toEqual(status);
@ -785,6 +925,7 @@ describe('TaskStore', () => {
);
const store = new TaskStore({
logger: mockLogger(),
index: 'tasky',
taskManagerId: '',
serializer,
@ -792,6 +933,7 @@ describe('TaskStore', () => {
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
adHocTaskCounter,
allowReadingInvalidState: false,
});
expect(await store.getLifecycle(randomId())).toEqual(TaskLifecycleResult.NotFound);
@ -803,6 +945,7 @@ describe('TaskStore', () => {
);
const store = new TaskStore({
logger: mockLogger(),
index: 'tasky',
taskManagerId: '',
serializer,
@ -810,6 +953,7 @@ describe('TaskStore', () => {
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
adHocTaskCounter,
allowReadingInvalidState: false,
});
return expect(store.getLifecycle(randomId())).rejects.toThrow('Bad Request');
@ -821,6 +965,7 @@ describe('TaskStore', () => {
beforeAll(() => {
store = new TaskStore({
logger: mockLogger(),
index: 'tasky',
taskManagerId: '',
serializer,
@ -828,6 +973,7 @@ describe('TaskStore', () => {
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
adHocTaskCounter,
allowReadingInvalidState: false,
});
});
@ -849,6 +995,7 @@ describe('TaskStore', () => {
scheduledAt: '2019-02-12T21:01:22.479Z',
startedAt: null,
state: '{"foo":"bar"}',
stateVersion: 1,
status: 'idle',
taskType: 'report',
traceparent: 'apmTraceparent',
@ -909,6 +1056,7 @@ describe('TaskStore', () => {
scope: undefined,
startedAt: null,
state: { foo: 'bar' },
stateVersion: 1,
status: 'idle',
taskType: 'report',
user: undefined,
@ -981,4 +1129,50 @@ describe('TaskStore', () => {
expect(adHocTaskCounter.count).toEqual(0);
});
});
describe('TaskValidator', () => {
test(`should pass allowReadingInvalidState:false accordingly`, () => {
const logger = mockLogger();
new TaskStore({
logger,
index: 'tasky',
taskManagerId: '',
serializer,
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
adHocTaskCounter,
allowReadingInvalidState: false,
});
expect(jest.requireMock('./task_validator').TaskValidator).toHaveBeenCalledWith({
logger,
definitions: taskDefinitions,
allowReadingInvalidState: false,
});
});
test(`should pass allowReadingInvalidState:true accordingly`, () => {
const logger = mockLogger();
new TaskStore({
logger,
index: 'tasky',
taskManagerId: '',
serializer,
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
adHocTaskCounter,
allowReadingInvalidState: true,
});
expect(jest.requireMock('./task_validator').TaskValidator).toHaveBeenCalledWith({
logger,
definitions: taskDefinitions,
allowReadingInvalidState: true,
});
});
});
});

View file

@ -13,7 +13,7 @@ import { omit, defaults, get } from 'lodash';
import { SavedObjectError } from '@kbn/core-saved-objects-common';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { SavedObjectsBulkDeleteResponse } from '@kbn/core/server';
import type { SavedObjectsBulkDeleteResponse, Logger } from '@kbn/core/server';
import {
SavedObject,
@ -36,6 +36,7 @@ import {
import { TaskTypeDictionary } from './task_type_dictionary';
import { AdHocTaskCounter } from './lib/adhoc_task_counter';
import { TaskValidator } from './task_validator';
export interface StoreOpts {
esClient: ElasticsearchClient;
@ -45,6 +46,8 @@ export interface StoreOpts {
savedObjectsRepository: ISavedObjectsRepository;
serializer: ISavedObjectsSerializer;
adHocTaskCounter: AdHocTaskCounter;
allowReadingInvalidState: boolean;
logger: Logger;
}
export interface SearchOpts {
@ -97,6 +100,7 @@ export class TaskStore {
public readonly index: string;
public readonly taskManagerId: string;
public readonly errors$ = new Subject<Error>();
public readonly taskValidator: TaskValidator;
private esClient: ElasticsearchClient;
private esClientWithoutRetries: ElasticsearchClient;
@ -122,6 +126,11 @@ export class TaskStore {
this.serializer = opts.serializer;
this.savedObjectsRepository = opts.savedObjectsRepository;
this.adHocTaskCounter = opts.adHocTaskCounter;
this.taskValidator = new TaskValidator({
logger: opts.logger,
definitions: opts.definitions,
allowReadingInvalidState: opts.allowReadingInvalidState,
});
this.esClientWithoutRetries = opts.esClient.child({
// Timeouts are retried and make requests timeout after (requestTimeout * (1 + maxRetries))
// The poller doesn't need retry logic because it will try again at the next polling cycle
@ -150,9 +159,11 @@ export class TaskStore {
let savedObject;
try {
const validatedTaskInstance =
this.taskValidator.getValidatedTaskInstanceForUpdating(taskInstance);
savedObject = await this.savedObjectsRepository.create<SerializedConcreteTaskInstance>(
'task',
taskInstanceToAttributes(taskInstance),
taskInstanceToAttributes(validatedTaskInstance),
{ id: taskInstance.id, refresh: false }
);
if (get(taskInstance, 'schedule.interval', null) == null) {
@ -163,7 +174,8 @@ export class TaskStore {
throw e;
}
return savedObjectToConcreteTaskInstance(savedObject);
const result = savedObjectToConcreteTaskInstance(savedObject);
return this.taskValidator.getValidatedTaskInstanceFromReading(result);
}
/**
@ -174,9 +186,11 @@ export class TaskStore {
public async bulkSchedule(taskInstances: TaskInstance[]): Promise<ConcreteTaskInstance[]> {
const objects = taskInstances.map((taskInstance) => {
this.definitions.ensureHas(taskInstance.taskType);
const validatedTaskInstance =
this.taskValidator.getValidatedTaskInstanceForUpdating(taskInstance);
return {
type: 'task',
attributes: taskInstanceToAttributes(taskInstance),
attributes: taskInstanceToAttributes(validatedTaskInstance),
id: taskInstance.id,
};
});
@ -197,7 +211,10 @@ export class TaskStore {
throw e;
}
return savedObjects.saved_objects.map((so) => savedObjectToConcreteTaskInstance(so));
return savedObjects.saved_objects.map((so) => {
const taskInstance = savedObjectToConcreteTaskInstance(so);
return this.taskValidator.getValidatedTaskInstanceFromReading(taskInstance);
});
}
/**
@ -222,8 +239,14 @@ export class TaskStore {
* @param {TaskDoc} doc
* @returns {Promise<TaskDoc>}
*/
public async update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance> {
const attributes = taskInstanceToAttributes(doc);
public async update(
doc: ConcreteTaskInstance,
options: { validate: boolean }
): Promise<ConcreteTaskInstance> {
const taskInstance = this.taskValidator.getValidatedTaskInstanceForUpdating(doc, {
validate: options.validate,
});
const attributes = taskInstanceToAttributes(taskInstance);
let updatedSavedObject;
try {
@ -241,13 +264,16 @@ export class TaskStore {
throw e;
}
return savedObjectToConcreteTaskInstance(
const result = savedObjectToConcreteTaskInstance(
// The SavedObjects update api forces a Partial on the `attributes` on the response,
// but actually returns the whole object that is passed to it, so as we know we're
// passing in the whole object, this is safe to do.
// This is far from ideal, but unless we change the SavedObjectsClient this is the best we can do
{ ...updatedSavedObject, attributes: defaults(updatedSavedObject.attributes, attributes) }
);
return this.taskValidator.getValidatedTaskInstanceFromReading(result, {
validate: options.validate,
});
}
/**
@ -257,9 +283,15 @@ export class TaskStore {
* @param {Array<TaskDoc>} docs
* @returns {Promise<Array<TaskDoc>>}
*/
public async bulkUpdate(docs: ConcreteTaskInstance[]): Promise<BulkUpdateResult[]> {
public async bulkUpdate(
docs: ConcreteTaskInstance[],
options: { validate: boolean }
): Promise<BulkUpdateResult[]> {
const attributesByDocId = docs.reduce((attrsById, doc) => {
attrsById.set(doc.id, taskInstanceToAttributes(doc));
const taskInstance = this.taskValidator.getValidatedTaskInstanceForUpdating(doc, {
validate: options.validate,
});
attrsById.set(doc.id, taskInstanceToAttributes(taskInstance));
return attrsById;
}, new Map());
@ -283,21 +315,25 @@ export class TaskStore {
}
return updatedSavedObjects.map((updatedSavedObject) => {
return updatedSavedObject.error !== undefined
? asErr({
type: 'task',
id: updatedSavedObject.id,
error: updatedSavedObject.error,
})
: asOk(
savedObjectToConcreteTaskInstance({
...updatedSavedObject,
attributes: defaults(
updatedSavedObject.attributes,
attributesByDocId.get(updatedSavedObject.id)!
),
})
);
if (updatedSavedObject.error !== undefined) {
return asErr({
type: 'task',
id: updatedSavedObject.id,
error: updatedSavedObject.error,
});
}
const taskInstance = savedObjectToConcreteTaskInstance({
...updatedSavedObject,
attributes: defaults(
updatedSavedObject.attributes,
attributesByDocId.get(updatedSavedObject.id)!
),
});
const result = this.taskValidator.getValidatedTaskInstanceFromReading(taskInstance, {
validate: options.validate,
});
return asOk(result);
});
}
@ -346,7 +382,8 @@ export class TaskStore {
this.errors$.next(e);
throw e;
}
return savedObjectToConcreteTaskInstance(result);
const taskInstance = savedObjectToConcreteTaskInstance(result);
return this.taskValidator.getValidatedTaskInstanceFromReading(taskInstance);
}
/**
@ -369,7 +406,10 @@ export class TaskStore {
if (task.error) {
return asErr({ id: task.id, type: task.type, error: task.error });
}
return asOk(savedObjectToConcreteTaskInstance(task));
const taskInstance = savedObjectToConcreteTaskInstance(task);
const validatedTaskInstance =
this.taskValidator.getValidatedTaskInstanceFromReading(taskInstance);
return asOk(validatedTaskInstance);
});
}
@ -413,7 +453,9 @@ export class TaskStore {
// @ts-expect-error @elastic/elasticsearch _source is optional
.map((doc) => this.serializer.rawToSavedObject(doc))
.map((doc) => omit(doc, 'namespace') as SavedObject<SerializedConcreteTaskInstance>)
.map(savedObjectToConcreteTaskInstance),
.map((doc) => savedObjectToConcreteTaskInstance(doc))
.map((doc) => this.taskValidator.getValidatedTaskInstanceFromReading(doc))
.filter((doc): doc is ConcreteTaskInstance => !!doc),
};
} catch (e) {
this.errors$.next(e);

View file

@ -5,6 +5,7 @@
* 2.0.
*/
import { ObjectType } from '@kbn/config-schema';
import { Logger } from '@kbn/core/server';
import { TaskDefinition, taskDefinitionSchema, TaskRunCreatorFunction } from './task';
import { CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE } from './constants';
@ -65,6 +66,13 @@ export interface TaskRegisterDefinition {
* The default value, if not given, is 0.
*/
maxConcurrency?: number;
stateSchemaByVersion?: Record<
number,
{
schema: ObjectType;
up: (state: Record<string, unknown>) => Record<string, unknown>;
}
>;
}
/**

View file

@ -0,0 +1,397 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { schema } from '@kbn/config-schema';
import { taskManagerMock } from './mocks';
import { mockLogger } from './test_utils';
import { TaskValidator } from './task_validator';
import { TaskTypeDictionary } from './task_type_dictionary';
const fooTaskDefinition = {
title: 'Foo',
description: '',
createTaskRunner() {
return {
async run() {
return {
state: {},
};
},
};
},
};
describe('TaskValidator', () => {
describe('getValidatedTaskInstanceFromReading()', () => {
it(`should return the task as-is whenever the task definition isn't defined`, () => {
const definitions = new TaskTypeDictionary(mockLogger());
const taskValidator = new TaskValidator({
logger: mockLogger(),
definitions,
allowReadingInvalidState: false,
});
const task = taskManagerMock.createTask();
const result = taskValidator.getValidatedTaskInstanceFromReading(task);
expect(result).toEqual(task);
});
it(`should return the task as-is whenever the validate:false option is passed-in`, () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: {
...fooTaskDefinition,
stateSchemaByVersion: {
1: {
up: (state) => state,
schema: schema.object({
foo: schema.string(),
}),
},
},
},
});
const taskValidator = new TaskValidator({
logger: mockLogger(),
definitions,
allowReadingInvalidState: false,
});
const task = taskManagerMock.createTask();
const result = taskValidator.getValidatedTaskInstanceFromReading(task, { validate: false });
expect(result).toEqual(task);
});
// TODO: Remove skip once all task types have defined their state schema.
// https://github.com/elastic/kibana/issues/159347
it.skip(`should fail to validate the state schema when the task type doesn't have stateSchemaByVersion defined`, () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: fooTaskDefinition,
});
const taskValidator = new TaskValidator({
logger: mockLogger(),
definitions,
allowReadingInvalidState: false,
});
const task = taskManagerMock.createTask({ state: { foo: 'bar' } });
expect(() =>
taskValidator.getValidatedTaskInstanceFromReading(task)
).toThrowErrorMatchingInlineSnapshot(
`"[TaskValidator] stateSchemaByVersion not defined for task type: foo"`
);
});
it(`should validate the state schema`, () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: {
...fooTaskDefinition,
stateSchemaByVersion: {
1: {
up: (state) => state,
schema: schema.object({
foo: schema.string(),
}),
},
},
},
});
const taskValidator = new TaskValidator({
logger: mockLogger(),
definitions,
allowReadingInvalidState: false,
});
const task = taskManagerMock.createTask({ stateVersion: 1, state: { foo: 'bar' } });
const result = taskValidator.getValidatedTaskInstanceFromReading(task);
expect(result).toEqual(task);
});
it(`should fail validation when the state schema doesn't match the state data`, () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: {
...fooTaskDefinition,
stateSchemaByVersion: {
1: {
up: (state) => state,
schema: schema.object({
foo: schema.string(),
}),
},
},
},
});
const taskValidator = new TaskValidator({
logger: mockLogger(),
definitions,
allowReadingInvalidState: false,
});
const task = taskManagerMock.createTask({ stateVersion: 1, state: { foo: true } });
expect(() =>
taskValidator.getValidatedTaskInstanceFromReading(task)
).toThrowErrorMatchingInlineSnapshot(
`"[foo]: expected value of type [string] but got [boolean]"`
);
});
it(`should return original state when the state is invalid and allowReadingInvalidState is true`, () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: {
...fooTaskDefinition,
stateSchemaByVersion: {
1: {
up: (state) => state,
schema: schema.object({
foo: schema.string(),
}),
},
},
},
});
const taskValidator = new TaskValidator({
logger: mockLogger(),
definitions,
allowReadingInvalidState: true,
});
const task = taskManagerMock.createTask({ stateVersion: 1, state: { foo: true } });
const result = taskValidator.getValidatedTaskInstanceFromReading(task);
expect(result.state).toEqual({ foo: true });
});
it(`should remove unknown fields`, () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: {
...fooTaskDefinition,
stateSchemaByVersion: {
1: {
up: (state) => state,
schema: schema.object({
foo: schema.string(),
}),
},
},
},
});
const taskValidator = new TaskValidator({
logger: mockLogger(),
definitions,
allowReadingInvalidState: false,
});
const task = taskManagerMock.createTask({
stateVersion: 1,
state: { foo: 'foo', bar: 'bar' },
});
const result = taskValidator.getValidatedTaskInstanceFromReading(task);
expect(result.state).toEqual({ foo: 'foo' });
});
it(`should migrate state when reading from a document without stateVersion`, () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: {
...fooTaskDefinition,
stateSchemaByVersion: {
1: {
up: (state) => ({ ...state, baz: 'baz' }),
schema: schema.object({
foo: schema.string(),
baz: schema.string(),
}),
},
},
},
});
const taskValidator = new TaskValidator({
logger: mockLogger(),
definitions,
allowReadingInvalidState: false,
});
const task = taskManagerMock.createTask({ stateVersion: undefined, state: { foo: 'foo' } });
const result = taskValidator.getValidatedTaskInstanceFromReading(task);
expect(result.state).toEqual({ foo: 'foo', baz: 'baz' });
});
it(`should migrate state when reading from an older version`, () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: {
...fooTaskDefinition,
stateSchemaByVersion: {
1: {
up: (state) => state,
schema: schema.object({
foo: schema.string(),
}),
},
2: {
up: (state) => ({ ...state, baz: 'baz' }),
schema: schema.object({
foo: schema.string(),
baz: schema.string(),
}),
},
},
},
});
const taskValidator = new TaskValidator({
logger: mockLogger(),
definitions,
allowReadingInvalidState: false,
});
const task = taskManagerMock.createTask({ stateVersion: 1, state: { foo: 'foo' } });
const result = taskValidator.getValidatedTaskInstanceFromReading(task);
expect(result.state).toEqual({ foo: 'foo', baz: 'baz' });
});
it(`should throw during the migration phase if a schema version is missing`, () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: {
...fooTaskDefinition,
stateSchemaByVersion: {
1: {
up: (state) => state,
schema: schema.object({
foo: schema.string(),
}),
},
3: {
up: (state) => ({ ...state, baz: 'baz' }),
schema: schema.object({
foo: schema.string(),
baz: schema.string(),
}),
},
},
},
});
const taskValidator = new TaskValidator({
logger: mockLogger(),
definitions,
allowReadingInvalidState: false,
});
const task = taskManagerMock.createTask({ stateVersion: 1, state: { foo: 'foo' } });
expect(() =>
taskValidator.getValidatedTaskInstanceFromReading(task)
).toThrowErrorMatchingInlineSnapshot(
`"[TaskValidator] state schema for foo missing version: 2"`
);
});
});
describe('getValidatedTaskInstanceForUpdating()', () => {
it(`should return the task as-is whenever the task definition isn't defined`, () => {
const definitions = new TaskTypeDictionary(mockLogger());
const taskValidator = new TaskValidator({
logger: mockLogger(),
definitions,
allowReadingInvalidState: false,
});
const task = taskManagerMock.createTask();
const result = taskValidator.getValidatedTaskInstanceForUpdating(task);
expect(result).toEqual(task);
});
it(`should return the task as-is whenever the validate:false option is passed-in`, () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: {
...fooTaskDefinition,
stateSchemaByVersion: {
1: {
up: (state) => state,
schema: schema.object({
foo: schema.string(),
}),
},
},
},
});
const taskValidator = new TaskValidator({
logger: mockLogger(),
definitions,
allowReadingInvalidState: false,
});
const task = taskManagerMock.createTask();
const result = taskValidator.getValidatedTaskInstanceForUpdating(task, { validate: false });
expect(result).toEqual(task);
});
// TODO: Remove skip once all task types have defined their state schema.
// https://github.com/elastic/kibana/issues/159347
it.skip(`should fail to validate the state schema when the task type doesn't have stateSchemaByVersion defined`, () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: fooTaskDefinition,
});
const taskValidator = new TaskValidator({
logger: mockLogger(),
definitions,
allowReadingInvalidState: false,
});
const task = taskManagerMock.createTask({ state: { foo: 'bar' } });
expect(() =>
taskValidator.getValidatedTaskInstanceForUpdating(task)
).toThrowErrorMatchingInlineSnapshot(
`"[TaskValidator] stateSchemaByVersion not defined for task type: foo"`
);
});
it(`should validate the state schema`, () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: {
...fooTaskDefinition,
stateSchemaByVersion: {
1: {
up: (state) => state,
schema: schema.object({
foo: schema.string(),
}),
},
},
},
});
const taskValidator = new TaskValidator({
logger: mockLogger(),
definitions,
allowReadingInvalidState: false,
});
const task = taskManagerMock.createTask({ state: { foo: 'bar' } });
const { stateVersion, ...result } = taskValidator.getValidatedTaskInstanceForUpdating(task);
expect(result).toEqual(task);
expect(stateVersion).toEqual(1);
});
it(`should fail to validate the state schema when unknown fields are present`, () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: {
...fooTaskDefinition,
stateSchemaByVersion: {
1: {
up: (state) => state,
schema: schema.object({
foo: schema.string(),
}),
},
},
},
});
const taskValidator = new TaskValidator({
logger: mockLogger(),
definitions,
allowReadingInvalidState: false,
});
const task = taskManagerMock.createTask({ state: { foo: 'foo', bar: 'bar' } });
expect(() =>
taskValidator.getValidatedTaskInstanceForUpdating(task)
).toThrowErrorMatchingInlineSnapshot(`"[bar]: definition for this key is missing"`);
});
});
});

View file

@ -0,0 +1,205 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { max, memoize } from 'lodash';
import type { Logger } from '@kbn/core/server';
import type { ObjectType } from '@kbn/config-schema';
import { TaskTypeDictionary } from './task_type_dictionary';
import type { TaskInstance, ConcreteTaskInstance, TaskDefinition } from './task';
interface TaskValidatorOpts {
allowReadingInvalidState: boolean;
definitions: TaskTypeDictionary;
logger: Logger;
}
type LatestStateSchema =
| undefined
| {
schema: ObjectType;
version: number;
up: (state: Record<string, unknown>) => Record<string, unknown>;
};
export class TaskValidator {
private readonly logger: Logger;
private readonly definitions: TaskTypeDictionary;
private readonly allowReadingInvalidState: boolean;
private readonly cachedGetLatestStateSchema: (taskTypeDef: TaskDefinition) => LatestStateSchema;
private readonly cachedExtendSchema: typeof extendSchema;
constructor({ definitions, allowReadingInvalidState, logger }: TaskValidatorOpts) {
this.logger = logger;
this.definitions = definitions;
this.allowReadingInvalidState = allowReadingInvalidState;
this.cachedGetLatestStateSchema = memoize(
getLatestStateSchema,
(taskTypeDef) => taskTypeDef.type
);
this.cachedExtendSchema = memoize(
extendSchema,
// We need to cache two outcomes per task type (unknowns: ignore and unknowns: forbid)
(options) => `${options.taskType}|unknowns:${options.unknowns}`
);
}
public getValidatedTaskInstanceFromReading<T extends TaskInstance>(
task: T,
options: { validate: boolean } = { validate: true }
): T {
if (!options.validate) {
return task;
}
// In the scenario the task is unused / deprecated and Kibana needs to manipulate the task,
// we'll do a pass-through for those
if (!this.definitions.has(task.taskType)) {
return task;
}
const taskTypeDef = this.definitions.get(task.taskType);
const latestStateSchema = this.cachedGetLatestStateSchema(taskTypeDef);
// TODO: Remove once all task types have defined their state schema.
// https://github.com/elastic/kibana/issues/159347
// Otherwise, failures on read / write would occur. (don't forget to unskip test)
if (!latestStateSchema) {
return task;
}
let state = task.state;
try {
state = this.getValidatedStateSchema(
this.migrateTaskState(task.state, task.stateVersion, taskTypeDef, latestStateSchema),
task.taskType,
latestStateSchema,
'ignore'
);
} catch (e) {
if (!this.allowReadingInvalidState) {
throw e;
}
this.logger.debug(
`[${task.taskType}][${task.id}] Failed to validate the task's state. Allowing read operation to proceed because allow_reading_invalid_state is true. Error: ${e.message}`
);
}
return {
...task,
state,
};
}
public getValidatedTaskInstanceForUpdating<T extends TaskInstance>(
task: T,
options: { validate: boolean } = { validate: true }
): T {
if (!options.validate) {
return task;
}
// In the scenario the task is unused / deprecated and Kibana needs to manipulate the task,
// we'll do a pass-through for those
if (!this.definitions.has(task.taskType)) {
return task;
}
const taskTypeDef = this.definitions.get(task.taskType);
const latestStateSchema = this.cachedGetLatestStateSchema(taskTypeDef);
// TODO: Remove once all task types have defined their state schema.
// https://github.com/elastic/kibana/issues/159347
// Otherwise, failures on read / write would occur. (don't forget to unskip test)
if (!latestStateSchema) {
return task;
}
// We are doing a write operation which must validate against the latest state schema
return {
...task,
state: this.getValidatedStateSchema(task.state, task.taskType, latestStateSchema, 'forbid'),
stateVersion: latestStateSchema?.version,
};
}
private migrateTaskState(
state: ConcreteTaskInstance['state'],
currentVersion: number | undefined,
taskTypeDef: TaskDefinition,
latestStateSchema: LatestStateSchema
) {
if (!latestStateSchema || (currentVersion && currentVersion >= latestStateSchema.version)) {
return state;
}
let migratedState = state;
for (let i = currentVersion || 1; i <= latestStateSchema.version; i++) {
if (!taskTypeDef.stateSchemaByVersion || !taskTypeDef.stateSchemaByVersion[`${i}`]) {
throw new Error(
`[TaskValidator] state schema for ${taskTypeDef.type} missing version: ${i}`
);
}
migratedState = taskTypeDef.stateSchemaByVersion[i].up(migratedState);
try {
taskTypeDef.stateSchemaByVersion[i].schema.validate(migratedState);
} catch (e) {
throw new Error(
`[TaskValidator] failed to migrate to version ${i} because the data returned from the up migration doesn't match the schema: ${e.message}`
);
}
}
return migratedState;
}
private getValidatedStateSchema(
state: ConcreteTaskInstance['state'],
taskType: string,
latestStateSchema: LatestStateSchema,
unknowns: 'forbid' | 'ignore'
): ConcreteTaskInstance['state'] {
if (!latestStateSchema) {
throw new Error(
`[TaskValidator] stateSchemaByVersion not defined for task type: ${taskType}`
);
}
return this.cachedExtendSchema({ unknowns, taskType, latestStateSchema }).validate(state);
}
}
function extendSchema(options: {
latestStateSchema: LatestStateSchema;
unknowns: 'forbid' | 'ignore';
taskType: string;
}) {
if (!options.latestStateSchema) {
throw new Error(
`[TaskValidator] stateSchemaByVersion not defined for task type: ${options.taskType}`
);
}
return options.latestStateSchema.schema.extendsDeep({ unknowns: options.unknowns });
}
function getLatestStateSchema(taskTypeDef: TaskDefinition): LatestStateSchema {
if (!taskTypeDef.stateSchemaByVersion) {
return;
}
const versions = Object.keys(taskTypeDef.stateSchemaByVersion).map((v) => parseInt(v, 10));
const latest = max(versions);
if (latest === undefined) {
return;
}
return {
version: latest,
schema: taskTypeDef.stateSchemaByVersion[latest].schema,
up: taskTypeDef.stateSchemaByVersion[latest].up,
};
}

View file

@ -19,7 +19,8 @@
"@kbn/es-types",
"@kbn/apm-utils",
"@kbn/core-saved-objects-common",
"@kbn/core-saved-objects-utils-server"
"@kbn/core-saved-objects-utils-server",
"@kbn/core-test-helpers-kbn-server"
],
"exclude": [
"target/**/*",