[Response Ops][Task Manager] Run tasks as the current user (#205577)

## Summary

Resolves: https://github.com/elastic/kibana/issues/190661

This PR allows tasks to be ran scoped as the current user. It
accomplishes this by creating an API when the user schedules this task.
It will then persist the API key in the task instance which allows the
task handler to have access to the user's privileges.

To test: 

1. Run `yarn start --run-examples`
2. Navigate to
`http://localhost:5601/app/triggersActionsUiExample/task_manager_with_api_key`
3. Click on `Schedule Task 1 and 2`, this will schedule a mock test that
is defined in
`x-pack/platform/plugins/shared/alerting/server/plugin.ts`, which prints
out the task instance and the scoped clients.
4. Click on `Remove` to cancel the task, which then cleans up and
invalidates the API keys.
5. Reschedule the 2 tasks and click on `Remove All Tasks` which will
bulk remove tasks and invalidate API keys.

Limitation of this approach:
- Because we depend on a request, this would mean every schedule that is
tied to a user needs to be triggered using a request. (Unless we have a
way of generating an API key without using the request).

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Jiawei Wu 2025-04-02 19:09:07 -06:00 committed by GitHub
parent 8c4dc2e77b
commit 4bdea60433
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
36 changed files with 1992 additions and 138 deletions

2
.github/CODEOWNERS vendored
View file

@ -444,6 +444,7 @@ src/platform/packages/shared/kbn-doc-links @elastic/docs
src/platform/packages/shared/kbn-dom-drag-drop @elastic/kibana-visualizations @elastic/kibana-data-discovery
src/platform/packages/shared/kbn-ebt-tools @elastic/kibana-core
src/platform/packages/shared/kbn-elastic-agent-utils @elastic/obs-ux-logs-team
src/platform/packages/shared/kbn-encrypted-saved-objects-shared @elastic/kibana-security
src/platform/packages/shared/kbn-es @elastic/kibana-operations
src/platform/packages/shared/kbn-es-archiver @elastic/kibana-operations @elastic/appex-qa
src/platform/packages/shared/kbn-es-errors @elastic/kibana-core
@ -883,6 +884,7 @@ x-pack/platform/plugins/private/reporting @elastic/appex-sharedux
x-pack/platform/plugins/private/rollup @elastic/kibana-management
x-pack/platform/plugins/private/runtime_fields @elastic/kibana-management
x-pack/platform/plugins/private/snapshot_restore @elastic/kibana-management
x-pack/platform/plugins/private/task_manager_dependencies @elastic/response-ops
x-pack/platform/plugins/private/telemetry_collection_xpack @elastic/kibana-core
x-pack/platform/plugins/private/transform @elastic/ml-ui
x-pack/platform/plugins/private/translations @elastic/kibana-localization

View file

@ -229,6 +229,7 @@ mapped_pages:
| [streamsAppWrapper](https://github.com/elastic/kibana/blob/main/x-pack/solutions/observability/plugins/observability_streams_wrapper/README.md) | Observability-specific wrapper for the streams app. This is responsible for actually registering the app and making it accessible in observability contexts |
| [synthetics](https://github.com/elastic/kibana/blob/main/x-pack/solutions/observability/plugins/synthetics/README.md) | The purpose of this plugin is to provide users of Heartbeat more visibility of what's happening in their infrastructure. |
| [taskManager](https://github.com/elastic/kibana/blob/main/x-pack/platform/plugins/shared/task_manager/README.md) | The task manager is a generic system for running background tasks. |
| [taskManagerDependencies](https://github.com/elastic/kibana/blob/main/x-pack/platform/plugins/private/task_manager_dependencies/README.md) | This plugin is used as a temporary sidecar plugin to enable the task manager plugin access to the encrypted saved objects client as there is a circular dependency if the task manager were to require the encrypted saved objects plugin directly. |
| [telemetryCollectionXpack](https://github.com/elastic/kibana/blob/main/x-pack/platform/plugins/private/telemetry_collection_xpack/README.md) | Gathers all usage collection, retrieving them from both: OSS and X-Pack plugins. |
| [threatIntelligence](https://github.com/elastic/kibana/blob/main/x-pack/solutions/security/plugins/threat_intelligence/README.md) | Elastic Threat Intelligence makes it easy to analyze and investigate potential security threats by aggregating data from multiple sources in one place. Youll be able to view data from all activated threat intelligence feeds and take action. |
| [timelines](https://github.com/elastic/kibana/blob/main/x-pack/platform/plugins/shared/timelines/README.md) | Timelines is a plugin that provides a grid component with accompanying server side apis to help users identify events of interest and perform root cause analysis within Kibana. |

View file

@ -487,6 +487,7 @@
"@kbn/embeddable-plugin": "link:src/platform/plugins/shared/embeddable",
"@kbn/embedded-lens-example-plugin": "link:x-pack/examples/embedded_lens_example",
"@kbn/encrypted-saved-objects-plugin": "link:x-pack/platform/plugins/shared/encrypted_saved_objects",
"@kbn/encrypted-saved-objects-shared": "link:src/platform/packages/shared/kbn-encrypted-saved-objects-shared",
"@kbn/enterprise-search-plugin": "link:x-pack/solutions/search/plugins/enterprise_search",
"@kbn/entities-data-access-plugin": "link:x-pack/solutions/observability/plugins/entities_data_access",
"@kbn/entities-schema": "link:x-pack/platform/packages/shared/kbn-entities-schema",
@ -959,6 +960,7 @@
"@kbn/streams-plugin": "link:x-pack/platform/plugins/shared/streams",
"@kbn/streams-schema": "link:x-pack/platform/packages/shared/kbn-streams-schema",
"@kbn/synthetics-plugin": "link:x-pack/solutions/observability/plugins/synthetics",
"@kbn/task-manager-dependencies-plugin": "link:x-pack/platform/plugins/private/task_manager_dependencies",
"@kbn/task-manager-fixture-plugin": "link:x-pack/test/alerting_api_integration/common/plugins/task_manager_fixture",
"@kbn/task-manager-performance-plugin": "link:x-pack/test/plugin_api_perf/plugins/task_manager_performance",
"@kbn/task-manager-plugin": "link:x-pack/platform/plugins/shared/task_manager",

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
export * from './src/encrypted_saved_objects_client_types';

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
module.exports = {
preset: '@kbn/test',
rootDir: '../../../../..',
roots: ['<rootDir>/src/platform/packages/shared/kbn-encrypted-saved-objects-shared'],
};

View file

@ -0,0 +1,9 @@
{
"type": "shared-common",
"id": "@kbn/encrypted-saved-objects-shared",
"owner": [
"@elastic/kibana-security"
],
"group": "platform",
"visibility": "shared"
}

View file

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import type {
ISavedObjectsPointInTimeFinder,
SavedObject,
SavedObjectsBaseOptions,
SavedObjectsCreatePointInTimeFinderDependencies,
SavedObjectsCreatePointInTimeFinderOptions,
} from '@kbn/core/server';
export interface EncryptedSavedObjectsClient {
getDecryptedAsInternalUser: <T = unknown>(
type: string,
id: string,
options?: SavedObjectsBaseOptions
) => Promise<SavedObject<T>>;
/**
* API method, that can be used to help page through large sets of saved objects and returns decrypted properties in result SO.
* Its interface matches interface of the corresponding Saved Objects API `createPointInTimeFinder` method:
*
* @example
* ```ts
* const finder = await this.encryptedSavedObjectsClient.createPointInTimeFinderDecryptedAsInternalUser({
* filter,
* type: 'my-saved-object-type',
* perPage: 1000,
* });
* for await (const response of finder.find()) {
* // process response
* }
* ```
*
* @param findOptions matches interface of corresponding argument of Saved Objects API `createPointInTimeFinder` {@link SavedObjectsCreatePointInTimeFinderOptions}
* @param dependencies matches interface of corresponding argument of Saved Objects API `createPointInTimeFinder` {@link SavedObjectsCreatePointInTimeFinderDependencies}
*
*/
createPointInTimeFinderDecryptedAsInternalUser<T = unknown, A = unknown>(
findOptions: SavedObjectsCreatePointInTimeFinderOptions,
dependencies?: SavedObjectsCreatePointInTimeFinderDependencies
): Promise<ISavedObjectsPointInTimeFinder<T, A>>;
}
export interface EncryptedSavedObjectsClientOptions {
includedHiddenTypes?: string[];
}

View file

@ -0,0 +1,19 @@
{
"extends": "../../../../../tsconfig.base.json",
"compilerOptions": {
"outDir": "target/types",
"types": [
"jest",
"node"
]
},
"include": [
"**/*.ts"
],
"exclude": [
"target/**/*"
],
"kbn_references": [
"@kbn/core",
]
}

View file

@ -832,6 +832,8 @@
"@kbn/embedded-lens-example-plugin/*": ["x-pack/examples/embedded_lens_example/*"],
"@kbn/encrypted-saved-objects-plugin": ["x-pack/platform/plugins/shared/encrypted_saved_objects"],
"@kbn/encrypted-saved-objects-plugin/*": ["x-pack/platform/plugins/shared/encrypted_saved_objects/*"],
"@kbn/encrypted-saved-objects-shared": ["src/platform/packages/shared/kbn-encrypted-saved-objects-shared"],
"@kbn/encrypted-saved-objects-shared/*": ["src/platform/packages/shared/kbn-encrypted-saved-objects-shared/*"],
"@kbn/enterprise-search-plugin": ["x-pack/solutions/search/plugins/enterprise_search"],
"@kbn/enterprise-search-plugin/*": ["x-pack/solutions/search/plugins/enterprise_search/*"],
"@kbn/entities-data-access-plugin": ["x-pack/solutions/observability/plugins/entities_data_access"],
@ -1948,6 +1950,8 @@
"@kbn/synthetics-plugin/*": ["x-pack/solutions/observability/plugins/synthetics/*"],
"@kbn/synthetics-private-location": ["x-pack/packages/kbn-synthetics-private-location"],
"@kbn/synthetics-private-location/*": ["x-pack/packages/kbn-synthetics-private-location/*"],
"@kbn/task-manager-dependencies-plugin": ["x-pack/platform/plugins/private/task_manager_dependencies"],
"@kbn/task-manager-dependencies-plugin/*": ["x-pack/platform/plugins/private/task_manager_dependencies/*"],
"@kbn/task-manager-fixture-plugin": ["x-pack/test/alerting_api_integration/common/plugins/task_manager_fixture"],
"@kbn/task-manager-fixture-plugin/*": ["x-pack/test/alerting_api_integration/common/plugins/task_manager_fixture/*"],
"@kbn/task-manager-performance-plugin": ["x-pack/test/plugin_api_perf/plugins/task_manager_performance"],

View file

@ -18,6 +18,7 @@
"dataViews",
"dataViewEditor",
"unifiedSearch",
"taskManager",
"fieldFormats",
"licensing",
"fieldsMetadata"

View file

@ -40,6 +40,7 @@ import { RuleStatusDropdownSandbox } from './components/rule_status_dropdown_san
import { RuleStatusFilterSandbox } from './components/rule_status_filter_sandbox';
import { AlertsTableSandbox } from './components/alerts_table_sandbox';
import { RulesSettingsLinkSandbox } from './components/rules_settings_link_sandbox';
import { TaskWithApiKeySandbox } from './components/task_with_api_key_sandbox';
export interface TriggersActionsUiExampleComponentParams {
http: CoreStart['http'];
@ -248,6 +249,15 @@ const TriggersActionsUiExampleApp = ({
</Page>
)}
/>
<Route
exact
path="/task_manager_with_api_key"
render={() => (
<Page title="Task Manager with API Key">
<TaskWithApiKeySandbox http={http} />
</Page>
)}
/>
</Routes>
</EuiPage>
</Router>

View file

@ -91,6 +91,17 @@ export const Sidebar = ({ history }: { history: ScopedHistory }) => {
},
],
},
{
name: 'Task Manager with API key',
id: 'task-manager-with-api-key',
items: [
{
id: 'task-manager-with-api-key-page',
name: 'Task Manager with API Key',
onClick: () => history.push('/task_manager_with_api_key'),
},
],
},
]}
/>
</EuiPageSidebar>

View file

@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React from 'react';
import { EuiButton } from '@elastic/eui';
import { HttpSetup } from '@kbn/core/public';
export interface TaskWithApiKeySandboxProps {
http: HttpSetup;
}
export const TaskWithApiKeySandbox = (props: TaskWithApiKeySandboxProps) => {
const { http } = props;
return (
<div>
<div>
<EuiButton
onClick={() => {
http.get('/api/triggers_actions_ui_example/schedule_task_with_api_key/task1');
}}
>
Schedule Task 1
</EuiButton>
</div>
<div>
<EuiButton
onClick={() => {
http.get('/api/triggers_actions_ui_example/schedule_task_with_api_key/task2');
}}
>
Schedule Task 2
</EuiButton>
</div>
<div>
<EuiButton
onClick={() => {
http.get('/api/triggers_actions_ui_example/remove_task_with_api_key/task1');
}}
>
Remove Task 1
</EuiButton>
<EuiButton
onClick={() => {
http.get('/api/triggers_actions_ui_example/remove_task_with_api_key/task2');
}}
>
Remove Task 2
</EuiButton>
</div>
<div>
<EuiButton
onClick={() => {
http.post('/api/triggers_actions_ui_example/bulk_remove_task_with_api_key', {
body: JSON.stringify({ ids: ['task1', 'task2'] }),
});
}}
>
Remove All Tasks
</EuiButton>
</div>
</div>
);
};

View file

@ -5,27 +5,165 @@
* 2.0.
*/
import { Plugin, CoreSetup } from '@kbn/core/server';
import { Plugin, CoreSetup, FakeRawRequest } from '@kbn/core/server';
import { PluginSetupContract as ActionsSetup } from '@kbn/actions-plugin/server';
import { AlertingServerSetup } from '@kbn/alerting-plugin/server';
import { addSpaceIdToPath } from '@kbn/spaces-plugin/common';
import { kibanaRequestFactory } from '@kbn/core-http-server-utils';
import {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '@kbn/task-manager-plugin/server';
import { schema } from '@kbn/config-schema';
import {
getConnectorType as getSystemLogExampleConnectorType,
connectorAdapter as systemLogConnectorAdapter,
} from './connector_types/system_log_example';
// this plugin's dependencies
export interface TriggersActionsUiExampleDeps {
export interface TriggersActionsUiExampleSetupDeps {
alerting: AlertingServerSetup;
actions: ActionsSetup;
taskManager: TaskManagerSetupContract;
}
export interface TriggersActionsUiStartDeps {
taskManager: TaskManagerStartContract;
}
export class TriggersActionsUiExamplePlugin
implements Plugin<void, void, TriggersActionsUiExampleDeps>
implements Plugin<void, void, TriggersActionsUiExampleSetupDeps, TriggersActionsUiStartDeps>
{
public setup(core: CoreSetup, { actions, alerting }: TriggersActionsUiExampleDeps) {
public setup(
core: CoreSetup<TriggersActionsUiStartDeps>,
{ actions, alerting, taskManager }: TriggersActionsUiExampleSetupDeps
) {
actions.registerType(getSystemLogExampleConnectorType());
alerting.registerConnectorAdapter(systemLogConnectorAdapter);
const router = core.http.createRouter();
router.get(
{
path: '/api/triggers_actions_ui_example/schedule_task_with_api_key/{id}',
validate: {
params: schema.object({
id: schema.string(),
}),
},
security: {
authz: {
enabled: false,
reason: 'This route is opted out from authorization because it is an dev example',
},
},
},
async (context, request, res) => {
const { taskManager: taskManagerStart } = (await core.getStartServices())[1];
const id = request.params.id;
await taskManagerStart.schedule(
{
id,
taskType: 'taskWithApiKey',
params: {},
state: {},
schedule: {
interval: '30s',
},
enabled: true,
},
{
request,
}
);
return res.ok();
}
);
router.get(
{
path: '/api/triggers_actions_ui_example/remove_task_with_api_key/{id}',
validate: {
params: schema.object({
id: schema.string(),
}),
},
security: {
authz: {
enabled: false,
reason: 'This route is opted out from authorization because it is an dev example',
},
},
},
async (context, request, res) => {
const { taskManager: taskManagerStart } = (await core.getStartServices())[1];
const id = request.params.id;
try {
await taskManagerStart.remove(id);
// eslint-disable-next-line no-empty
} catch (e) {}
return res.ok();
}
);
router.post(
{
path: '/api/triggers_actions_ui_example/bulk_remove_task_with_api_key',
validate: {
body: schema.object({
ids: schema.arrayOf(schema.string()),
}),
},
security: {
authz: {
enabled: false,
reason: 'This route is opted out from authorization because it is an dev example',
},
},
},
async (context, request, res) => {
const { taskManager: taskManagerStart } = (await core.getStartServices())[1];
const ids = request.body.ids;
try {
await taskManagerStart.bulkRemove(ids);
// eslint-disable-next-line no-empty
} catch (e) {}
return res.ok();
}
);
taskManager.registerTaskDefinitions({
taskWithApiKey: {
title: 'taskWithApiKey',
createTaskRunner: ({ taskInstance }) => ({
run: async () => {
const services = await core.getStartServices();
const fakeRawRequest: FakeRawRequest = {
headers: {
authorization: `ApiKey ${taskInstance?.apiKey}`,
},
path: '/',
};
const path = addSpaceIdToPath('/', taskInstance.userScope?.spaceId || 'default');
// Fake request from the API key
const fakeRequest = kibanaRequestFactory(fakeRawRequest);
services[0].http.basePath.set(fakeRequest, path);
return {
state: {},
};
},
cancel: async () => {},
}),
},
});
}
public start() {}

View file

@ -34,6 +34,9 @@
"@kbn/field-formats-plugin",
"@kbn/licensing-plugin",
"@kbn/response-ops-rule-form",
"@kbn/spaces-plugin",
"@kbn/core-http-server-utils",
"@kbn/task-manager-plugin",
"@kbn/fields-metadata-plugin",
]
}

View file

@ -0,0 +1,9 @@
# Task Manager Dependencies
This plugin is used as a temporary sidecar plugin to enable the task manager plugin access to
the encrypted saved objects client as there is a circular dependency if the task manager were to
require the encrypted saved objects plugin directly.
This is because the encrypted saved objects plugin has a dependency on the security plugin, which
has a dependency on the task manager plugin. In the future we can remove this plugin when we
extract the task manager related code from the security plugin into another plugin.

View file

@ -0,0 +1,23 @@
{
"type": "plugin",
"id": "@kbn/task-manager-dependencies-plugin",
"owner": [
"@elastic/response-ops"
],
"group": "platform",
"visibility": "private",
"description": "Temporary sidecar plugin allowing task manager to register the encrypted saved objects dependency, bypassing circular dependencies.",
"plugin": {
"id": "taskManagerDependencies",
"browser": false,
"server": true,
"configPath": [
"xpack",
"task_manager_dependencies",
],
"requiredPlugins": [
"taskManager",
"encryptedSavedObjects",
],
},
}

View file

@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export async function plugin() {
const { TaskManagerDependenciesPlugin } = await import('./plugin');
return new TaskManagerDependenciesPlugin();
}

View file

@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { CoreSetup, CoreStart } from '@kbn/core/server';
import type {
EncryptedSavedObjectsPluginSetup,
EncryptedSavedObjectsPluginStart,
} from '@kbn/encrypted-saved-objects-plugin/server';
import type {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '@kbn/task-manager-plugin/server';
export interface TaskManagerDependenciesPluginSetup {
encryptedSavedObjects: EncryptedSavedObjectsPluginSetup;
taskManager: TaskManagerSetupContract;
}
export interface TaskManagerDependenciesPluginStart {
encryptedSavedObjects: EncryptedSavedObjectsPluginStart;
taskManager: TaskManagerStartContract;
}
export class TaskManagerDependenciesPlugin {
public setup(core: CoreSetup, plugin: TaskManagerDependenciesPluginSetup) {
plugin.encryptedSavedObjects.registerType({
type: 'task',
attributesToEncrypt: new Set(['apiKey']),
attributesToIncludeInAAD: new Set(['id', 'taskType']),
enforceRandomId: false,
});
plugin.taskManager.registerCanEncryptedSavedObjects(plugin.encryptedSavedObjects.canEncrypt);
}
public start(core: CoreStart, plugin: TaskManagerDependenciesPluginStart) {
plugin.taskManager.registerEncryptedSavedObjectsClient(
plugin.encryptedSavedObjects.getClient({
includedHiddenTypes: ['task'],
})
);
}
}

View file

@ -0,0 +1,17 @@
{
"extends": "../../../../../tsconfig.base.json",
"compilerOptions": {
"outDir": "target/types"
},
"include": [
"server/**/*",
],
"kbn_references": [
"@kbn/core",
"@kbn/encrypted-saved-objects-plugin",
"@kbn/task-manager-plugin",
],
"exclude": [
"target/**/*",
]
}

View file

@ -18,6 +18,10 @@ import type {
SavedObjectsServiceSetup,
StartServicesAccessor,
} from '@kbn/core/server';
import type {
EncryptedSavedObjectsClient,
EncryptedSavedObjectsClientOptions,
} from '@kbn/encrypted-saved-objects-shared';
import type { PublicMethodsOf } from '@kbn/utility-types';
import { getDescriptorNamespace, normalizeNamespace } from './get_descriptor_namespace';
@ -25,6 +29,7 @@ import { SavedObjectsEncryptionExtension } from './saved_objects_encryption_exte
import type { EncryptedSavedObjectsService } from '../crypto';
export { normalizeNamespace };
export type { EncryptedSavedObjectsClient, EncryptedSavedObjectsClientOptions };
interface SetupSavedObjectsParams {
service: PublicMethodsOf<EncryptedSavedObjectsService>;
@ -36,43 +41,6 @@ export type ClientInstanciator = (
options?: EncryptedSavedObjectsClientOptions
) => EncryptedSavedObjectsClient;
export interface EncryptedSavedObjectsClientOptions {
includedHiddenTypes?: string[];
}
export interface EncryptedSavedObjectsClient {
getDecryptedAsInternalUser: <T = unknown>(
type: string,
id: string,
options?: SavedObjectsBaseOptions
) => Promise<SavedObject<T>>;
/**
* API method, that can be used to help page through large sets of saved objects and returns decrypted properties in result SO.
* Its interface matches interface of the corresponding Saved Objects API `createPointInTimeFinder` method:
*
* @example
* ```ts
* const finder = await this.encryptedSavedObjectsClient.createPointInTimeFinderDecryptedAsInternalUser({
* filter,
* type: 'my-saved-object-type',
* perPage: 1000,
* });
* for await (const response of finder.find()) {
* // process response
* }
* ```
*
* @param findOptions matches interface of corresponding argument of Saved Objects API `createPointInTimeFinder` {@link SavedObjectsCreatePointInTimeFinderOptions}
* @param dependencies matches interface of corresponding argument of Saved Objects API `createPointInTimeFinder` {@link SavedObjectsCreatePointInTimeFinderDependencies}
*
*/
createPointInTimeFinderDecryptedAsInternalUser<T = unknown, A = unknown>(
findOptions: SavedObjectsCreatePointInTimeFinderOptions,
dependencies?: SavedObjectsCreatePointInTimeFinderDependencies
): Promise<ISavedObjectsPointInTimeFinder<T, A>>;
}
export function setupSavedObjects({
service,
savedObjects,

View file

@ -15,6 +15,7 @@
"@kbn/core-security-common",
"@kbn/test-jest-helpers",
"@kbn/config",
"@kbn/encrypted-saved-objects-shared",
],
"exclude": [
"target/**/*",

View file

@ -16,7 +16,8 @@
],
"optionalPlugins": [
"cloud",
"usageCollection"
"usageCollection",
"spaces"
]
}
}

View file

@ -0,0 +1,289 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
isRequestApiKeyType,
getApiKeyFromRequest,
createApiKey,
getApiKeyAndUserScope,
} from './api_key_utils';
import { coreMock } from '@kbn/core/server/mocks';
import { httpServerMock } from '@kbn/core-http-server-mocks';
import type { SpacesPluginStart } from '@kbn/spaces-plugin/server';
import { spacesMock } from '@kbn/spaces-plugin/server/mocks';
import type { AuthenticatedUser } from '@kbn/core/server';
const mockTask = {
id: 'task',
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
};
describe('api_key_utils', () => {
describe('isRequestApiKeyType', () => {
test('should return true if the request is made by a API key', () => {
const mockUser = { authentication_type: 'api_key' } as AuthenticatedUser;
expect(isRequestApiKeyType(mockUser)).toBeTruthy();
});
test('should return false if the request is made by a user', () => {
const mockUser = { authentication_type: 'basic' } as AuthenticatedUser;
expect(isRequestApiKeyType(mockUser)).toBeFalsy();
});
});
describe('getApiKeyFromRequest', () => {
test('should return the API key from a request', () => {
const mockApiKey = Buffer.from('apiKeyId:apiKey').toString('base64');
const request = httpServerMock.createKibanaRequest({
headers: {
authorization: `ApiKey: ${mockApiKey}`,
},
});
const result = getApiKeyFromRequest(request);
expect(result).toEqual({ id: 'apiKeyId', api_key: 'apiKey' });
});
test('should return null if request is missing the authorization header', () => {
const request = httpServerMock.createKibanaRequest();
const result = getApiKeyFromRequest(request);
expect(result).toBeNull();
});
});
describe('createApiKey', () => {
test('should create the API key if the request was made by the client', async () => {
const request = httpServerMock.createKibanaRequest();
const coreStart = coreMock.createStart();
const mockUser = {
authentication_type: 'basic',
username: 'testUser',
};
coreStart.security.authc.apiKeys.areAPIKeysEnabled = jest.fn().mockReturnValueOnce(true);
coreStart.security.authc.getCurrentUser = jest.fn().mockReturnValueOnce(mockUser);
coreStart.security.authc.apiKeys.grantAsInternalUser = jest.fn().mockResolvedValueOnce({
id: 'apiKeyId',
name: 'TaskManager: testUser',
api_key: 'apiKey',
});
const result = await createApiKey([mockTask], request, true, coreStart.security);
const apiKeyResult = result.get('task');
const decodedApiKey = Buffer.from(apiKeyResult!.apiKey, 'base64').toString();
expect(decodedApiKey).toEqual('apiKeyId:apiKey');
expect(coreStart.security.authc.apiKeys.areAPIKeysEnabled).toHaveBeenCalled();
expect(coreStart.security.authc.getCurrentUser).toHaveBeenCalledWith(request);
expect(coreStart.security.authc.apiKeys.grantAsInternalUser).toHaveBeenCalledWith(request, {
name: 'TaskManager: report - testUser',
role_descriptors: {},
metadata: { managed: true },
});
});
test('should return the API key if the request was made by API key', async () => {
const mockApiKey = Buffer.from('apiKeyId:apiKey').toString('base64');
const request = httpServerMock.createKibanaRequest({
headers: {
authorization: `ApiKey: ${mockApiKey}`,
},
});
const coreStart = coreMock.createStart();
const mockUser = {
authentication_type: 'api_key',
username: 'testUser',
};
coreStart.security.authc.apiKeys.areAPIKeysEnabled = jest.fn().mockReturnValueOnce(true);
coreStart.security.authc.getCurrentUser = jest.fn().mockReturnValue(mockUser);
const result = await createApiKey([mockTask], request, true, coreStart.security);
const apiKeyResult = result.get('task');
const decodedApiKey = Buffer.from(apiKeyResult!.apiKey, 'base64').toString();
expect(decodedApiKey).toEqual('apiKeyId:apiKey');
expect(coreStart.security.authc.apiKeys.areAPIKeysEnabled).toHaveBeenCalled();
expect(coreStart.security.authc.getCurrentUser).toHaveBeenCalledWith(request);
expect(coreStart.security.authc.apiKeys.grantAsInternalUser).not.toHaveBeenCalled();
});
test('should throw if canEncryptSo is false', async () => {
const request = httpServerMock.createKibanaRequest();
const coreStart = coreMock.createStart();
await expect(
createApiKey([mockTask], request, false, coreStart.security)
).rejects.toMatchObject({
message:
'Unable to create API keys because the Encrypted Saved Objects plugin has not been registered or is missing encryption key.',
});
});
test('should throw if API keys are not enabled', async () => {
const request = httpServerMock.createKibanaRequest();
const coreStart = coreMock.createStart();
coreStart.security.authc.apiKeys.areAPIKeysEnabled = jest.fn().mockReturnValueOnce(false);
coreStart.security.authc.getCurrentUser = jest.fn().mockReturnValue(null);
await expect(
createApiKey([mockTask], request, true, coreStart.security)
).rejects.toMatchObject({
message: 'API keys are not enabled, cannot create API key.',
});
});
test('should throw if user is not authenticated', async () => {
const request = httpServerMock.createKibanaRequest();
const coreStart = coreMock.createStart();
coreStart.security.authc.apiKeys.areAPIKeysEnabled = jest.fn().mockReturnValueOnce(true);
await expect(
createApiKey([mockTask], request, true, coreStart.security)
).rejects.toMatchObject({
message: 'Cannot authenticate current user.',
});
});
test('should throw if API key was not created', async () => {
const request = httpServerMock.createKibanaRequest();
const coreStart = coreMock.createStart();
const mockUser = {
authentication_type: 'basic',
username: 'testUser',
};
coreStart.security.authc.apiKeys.areAPIKeysEnabled = jest.fn().mockReturnValueOnce(true);
coreStart.security.authc.getCurrentUser = jest.fn().mockReturnValueOnce(mockUser);
coreStart.security.authc.apiKeys.grantAsInternalUser = jest.fn().mockResolvedValueOnce(null);
await expect(
createApiKey([mockTask], request, true, coreStart.security)
).rejects.toMatchObject({
message: 'Could not create API key.',
});
});
});
describe('getUserScope', () => {
test('should return the users scope based on their request', async () => {
const request = httpServerMock.createKibanaRequest();
const coreStart = coreMock.createStart();
const spacesStart: jest.Mocked<SpacesPluginStart> = spacesMock.createStart();
spacesStart.spacesService.getActiveSpace = jest.fn().mockResolvedValue({
id: 'testSpace',
});
const mockUser = {
authentication_type: 'basic',
username: 'testUser',
};
coreStart.security.authc.apiKeys.areAPIKeysEnabled = jest.fn().mockReturnValueOnce(true);
coreStart.security.authc.getCurrentUser = jest.fn().mockReturnValueOnce(mockUser);
coreStart.security.authc.apiKeys.grantAsInternalUser = jest.fn().mockResolvedValueOnce({
id: 'apiKeyId',
name: 'TaskManager: testUser',
api_key: 'apiKey',
});
const result = await getApiKeyAndUserScope(
[mockTask],
request,
true,
coreStart.security,
spacesStart
);
expect(result.get('task')).toEqual({
apiKey: 'YXBpS2V5SWQ6YXBpS2V5',
userScope: {
apiKeyId: 'apiKeyId',
spaceId: 'testSpace',
apiKeyCreatedByUser: false,
},
});
});
test('should default space to default if space is not found', async () => {
const request = httpServerMock.createKibanaRequest();
const coreStart = coreMock.createStart();
const spacesStart: jest.Mocked<SpacesPluginStart> = spacesMock.createStart();
const mockUser = {
authentication_type: 'basic',
username: 'testUser',
};
coreStart.security.authc.apiKeys.areAPIKeysEnabled = jest.fn().mockReturnValueOnce(true);
coreStart.security.authc.getCurrentUser = jest.fn().mockReturnValueOnce(mockUser);
coreStart.security.authc.apiKeys.grantAsInternalUser = jest.fn().mockResolvedValueOnce({
id: 'apiKeyId',
name: 'TaskManager: testUser',
api_key: 'apiKey',
});
const result = await getApiKeyAndUserScope(
[mockTask],
request,
true,
coreStart.security,
spacesStart
);
expect(result.get('task')).toEqual({
apiKey: 'YXBpS2V5SWQ6YXBpS2V5',
userScope: {
apiKeyId: 'apiKeyId',
spaceId: 'default',
apiKeyCreatedByUser: false,
},
});
});
test('should set apiKeyCreatedByUser to true if the API key existed prior', async () => {
const mockApiKey = Buffer.from('apiKeyId:apiKey').toString('base64');
const request = httpServerMock.createKibanaRequest({
headers: {
authorization: `ApiKey: ${mockApiKey}`,
},
});
const coreStart = coreMock.createStart();
const spacesStart: jest.Mocked<SpacesPluginStart> = spacesMock.createStart();
const mockUser = {
authentication_type: 'api_key',
username: 'testUser',
};
coreStart.security.authc.apiKeys.areAPIKeysEnabled = jest.fn().mockReturnValueOnce(true);
coreStart.security.authc.getCurrentUser = jest.fn().mockReturnValue(mockUser);
const result = await getApiKeyAndUserScope(
[mockTask],
request,
true,
coreStart.security,
spacesStart
);
expect(result.get('task')).toEqual({
apiKey: 'YXBpS2V5SWQ6YXBpS2V5',
userScope: {
apiKeyId: 'apiKeyId',
spaceId: 'default',
apiKeyCreatedByUser: true,
},
});
});
});
});

View file

@ -0,0 +1,162 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { AuthenticatedUser, SecurityServiceStart } from '@kbn/core/server';
import type { KibanaRequest } from '@kbn/core/server';
import type { SpacesPluginStart } from '@kbn/spaces-plugin/server';
import { truncate } from 'lodash';
import type { TaskInstance, TaskUserScope } from '../task';
export interface APIKeyResult {
id: string;
api_key: string;
}
export interface EncodedApiKeyResult {
apiKey: string;
apiKeyId: string;
}
export interface ApiKeyAndUserScope {
apiKey: string;
userScope: TaskUserScope;
}
const getCredentialsFromRequest = (request: KibanaRequest) => {
const authorizationHeaderValue = request.headers.authorization;
if (!authorizationHeaderValue || typeof authorizationHeaderValue !== 'string') {
return null;
}
const [scheme] = authorizationHeaderValue.split(/\s+/);
return authorizationHeaderValue.substring(scheme.length + 1);
};
export const isRequestApiKeyType = (user: AuthenticatedUser | null) => {
return user?.authentication_type === 'api_key';
};
export const getApiKeyFromRequest = (request: KibanaRequest) => {
const credentials = getCredentialsFromRequest(request);
if (credentials) {
const apiKey = Buffer.from(credentials, 'base64').toString().split(':');
return {
id: apiKey[0],
api_key: apiKey[1],
};
}
return null;
};
export const createApiKey = async (
taskInstances: TaskInstance[],
request: KibanaRequest,
canEncryptSo: boolean,
security: SecurityServiceStart
) => {
if (!canEncryptSo) {
throw Error(
'Unable to create API keys because the Encrypted Saved Objects plugin has not been registered or is missing encryption key.'
);
}
if (!(await security.authc.apiKeys.areAPIKeysEnabled())) {
throw Error('API keys are not enabled, cannot create API key.');
}
const user = security.authc.getCurrentUser(request);
if (!user) {
throw Error('Cannot authenticate current user.');
}
const apiKeyByTaskIdMap = new Map<string, EncodedApiKeyResult>();
// If the user passed in their own API key, simply return it
if (isRequestApiKeyType(user)) {
const apiKeyCreateResult = getApiKeyFromRequest(request);
if (!apiKeyCreateResult) {
throw Error('Could not create API key.');
}
const { id, api_key: apiKey } = apiKeyCreateResult;
taskInstances.forEach((task) => {
apiKeyByTaskIdMap.set(task.id!, {
apiKey: Buffer.from(`${id}:${apiKey}`).toString('base64'),
apiKeyId: apiKeyCreateResult.id,
});
});
return apiKeyByTaskIdMap;
}
// If the user did not pass in their own API key, we need to create 1 key per task
// type (due to naming requirements).
const taskTypes = [...new Set(taskInstances.map((task) => task.taskType))];
const apiKeyByTaskTypeMap = new Map<string, EncodedApiKeyResult>();
for (const taskType of taskTypes) {
const apiKeyCreateResult = await security.authc.apiKeys.grantAsInternalUser(request, {
name: truncate(`TaskManager: ${taskType} - ${user.username}`, { length: 256 }),
role_descriptors: {},
metadata: { managed: true },
});
if (!apiKeyCreateResult) {
throw Error('Could not create API key.');
}
const { id, api_key: apiKey } = apiKeyCreateResult;
apiKeyByTaskTypeMap.set(taskType, {
apiKey: Buffer.from(`${id}:${apiKey}`).toString('base64'),
apiKeyId: apiKeyCreateResult.id,
});
}
// Assign each of the created API keys to the task ID
taskInstances.forEach((task) => {
const encodedApiKeyResult = apiKeyByTaskTypeMap.get(task.taskType);
if (encodedApiKeyResult) {
apiKeyByTaskIdMap.set(task.id!, encodedApiKeyResult);
}
});
return apiKeyByTaskIdMap;
};
export const getApiKeyAndUserScope = async (
taskInstances: TaskInstance[],
request: KibanaRequest,
canEncryptSo: boolean,
security: SecurityServiceStart,
spaces?: SpacesPluginStart
): Promise<Map<string, ApiKeyAndUserScope>> => {
const apiKeyByTaskIdMap = await createApiKey(taskInstances, request, canEncryptSo, security);
const space = await spaces?.spacesService.getActiveSpace(request);
const user = security.authc.getCurrentUser(request);
const apiKeyAndUserScopeByTaskId = new Map<string, ApiKeyAndUserScope>();
taskInstances.forEach((task) => {
const encodedApiKeyResult = apiKeyByTaskIdMap.get(task.id!);
if (encodedApiKeyResult) {
apiKeyAndUserScopeByTaskId.set(task.id!, {
apiKey: encodedApiKeyResult.apiKey,
userScope: {
apiKeyId: encodedApiKeyResult.apiKeyId,
spaceId: space?.id || 'default',
// Set apiKeyCreatedByUser to true if the user passed in their own API key, since we do
// not want to invalidate a specific API key that was not created by the task manager
apiKeyCreatedByUser: isRequestApiKeyType(user),
},
});
}
});
return apiKeyAndUserScopeByTaskId;
};

View file

@ -15,6 +15,7 @@ const createSetupMock = () => {
index: '.kibana_task_manager',
addMiddleware: jest.fn(),
registerTaskDefinitions: jest.fn(),
registerCanEncryptedSavedObjects: jest.fn(),
};
return mock;
};
@ -36,6 +37,7 @@ const createStartMock = () => {
bulkEnable: jest.fn(),
getRegisteredTypes: jest.fn(),
bulkUpdateState: jest.fn(),
registerEncryptedSavedObjectsClient: jest.fn(),
};
return mock;
};

View file

@ -22,6 +22,8 @@ import type {
CoreStart,
} from '@kbn/core/server';
import type { CloudSetup, CloudStart } from '@kbn/cloud-plugin/server';
import type { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-shared';
import type { SpacesPluginStart } from '@kbn/spaces-plugin/server';
import {
registerDeleteInactiveNodesTaskDefinition,
scheduleDeleteInactiveNodesTaskDefinition,
@ -69,6 +71,7 @@ export interface TaskManagerSetupContract {
* @param taskDefinitions - The Kibana task definitions dictionary
*/
registerTaskDefinitions: (taskDefinitions: TaskDefinitionRegistry) => void;
registerCanEncryptedSavedObjects: (canEncrypt: boolean) => void;
}
export type TaskManagerStartContract = Pick<
@ -86,11 +89,13 @@ export type TaskManagerStartContract = Pick<
removeIfExists: TaskStore['remove'];
} & {
getRegisteredTypes: () => string[];
registerEncryptedSavedObjectsClient: (client: EncryptedSavedObjectsClient) => void;
};
export interface TaskManagerPluginsStart {
cloud?: CloudStart;
usageCollection?: UsageCollectionStart;
spaces?: SpacesPluginStart;
}
export interface TaskManagerPluginsSetup {
@ -128,6 +133,7 @@ export class TaskManagerPlugin
private kibanaDiscoveryService?: KibanaDiscoveryService;
private heapSizeLimit: number = 0;
private numOfKibanaInstances$: Subject<number> = new BehaviorSubject(1);
private canEncryptSavedObjects: boolean;
constructor(private readonly initContext: PluginInitializerContext) {
this.initContext = initContext;
@ -138,6 +144,7 @@ export class TaskManagerPlugin
this.nodeRoles = initContext.node.roles;
this.shouldRunBackgroundTasks = this.nodeRoles.backgroundTasks;
this.adHocTaskCounter = new AdHocTaskCounter();
this.canEncryptSavedObjects = false;
}
isNodeBackgroundTasksOnly() {
@ -168,6 +175,7 @@ export class TaskManagerPlugin
});
setupSavedObjects(core.savedObjects, this.config);
this.taskManagerId = this.initContext.env.instanceUuid;
if (!this.taskManagerId) {
@ -272,12 +280,15 @@ export class TaskManagerPlugin
registerTaskDefinitions: (taskDefinition: TaskDefinitionRegistry) => {
this.definitions.registerTaskDefinitions(taskDefinition);
},
registerCanEncryptedSavedObjects: (canEncrypt: boolean) => {
this.canEncryptSavedObjects = canEncrypt;
},
};
}
public start(
{ savedObjects, elasticsearch, executionContext, docLinks }: CoreStart,
{ cloud }: TaskManagerPluginsStart
{ savedObjects, elasticsearch, executionContext, security }: CoreStart,
{ cloud, spaces }: TaskManagerPluginsStart
): TaskManagerStartContract {
const savedObjectsRepository = savedObjects.createInternalRepository([
TASK_SO_NAME,
@ -300,6 +311,7 @@ export class TaskManagerPlugin
const taskStore = new TaskStore({
serializer,
savedObjectsRepository,
savedObjectsService: savedObjects,
esClient: elasticsearch.client.asInternalUser,
index: TASK_MANAGER_INDEX,
definitions: this.definitions,
@ -308,6 +320,9 @@ export class TaskManagerPlugin
allowReadingInvalidState: this.config.allow_reading_invalid_state,
logger: this.logger,
requestTimeouts: this.config.request_timeouts,
security,
canEncryptSavedObjects: this.canEncryptSavedObjects,
spaces,
});
const isServerless = this.initContext.env.packageInfo.buildFlavor === 'serverless';
@ -411,6 +426,9 @@ export class TaskManagerPlugin
bulkUpdateSchedules: (...args) => taskScheduling.bulkUpdateSchedules(...args),
getRegisteredTypes: () => this.definitions.getAllTypes(),
bulkUpdateState: (...args) => taskScheduling.bulkUpdateState(...args),
registerEncryptedSavedObjectsClient: (client: EncryptedSavedObjectsClient) => {
taskStore.registerEncryptedSavedObjectsClient(client);
},
};
}

View file

@ -10,6 +10,7 @@
import type { ObjectType, TypeOf } from '@kbn/config-schema';
import { schema } from '@kbn/config-schema';
import { isNumber } from 'lodash';
import type { KibanaRequest } from '@kbn/core/server';
import { isErr, tryAsResult } from './lib/result_type';
import type { Interval } from './lib/intervals';
import { isInterval, parseIntervalAsMillisecond } from './lib/intervals';
@ -250,6 +251,12 @@ export interface IntervalSchedule {
interval: Interval;
}
export interface TaskUserScope {
apiKeyId: string;
spaceId?: string;
apiKeyCreatedByUser: boolean;
}
/*
* A task instance represents all of the data required to store, fetch,
* and execute a task.
@ -356,6 +363,16 @@ export interface TaskInstance {
*/
partition?: number;
/**
* Used to allow tasks to be scoped to a user via their API key
*/
apiKey?: string;
/**
* Meta data related to the API key associated with this task
*/
userScope?: TaskUserScope;
/*
* Optionally override the priority defined in the task type for this specific task instance
*/
@ -492,8 +509,16 @@ export type SerializedConcreteTaskInstance = Omit<
retryAt: string | null;
runAt: string;
partition?: number;
apiKey?: string;
userScope?: TaskUserScope;
};
export type PartialSerializedConcreteTaskInstance = Partial<SerializedConcreteTaskInstance> & {
id: SerializedConcreteTaskInstance['id'];
};
export interface ApiKeyOptions {
request?: KibanaRequest;
}
export type ScheduleOptions = Record<string, unknown> & ApiKeyOptions;

View file

@ -17,6 +17,7 @@ import { mockLogger } from './test_utils';
import { TaskTypeDictionary } from './task_type_dictionary';
import { taskManagerMock } from './mocks';
import { omit } from 'lodash';
import { httpServerMock } from '@kbn/core/server/mocks';
let fakeTimer: sinon.SinonFakeTimers;
jest.mock('uuid', () => ({
@ -71,13 +72,43 @@ describe('TaskScheduling', () => {
};
await taskScheduling.schedule(task);
expect(mockTaskStore.schedule).toHaveBeenCalled();
expect(mockTaskStore.schedule).toHaveBeenCalledWith({
...task,
id: undefined,
schedule: undefined,
traceparent: 'parent',
enabled: true,
});
expect(mockTaskStore.schedule).toHaveBeenCalledWith(
{
...task,
id: undefined,
schedule: undefined,
traceparent: 'parent',
enabled: true,
},
undefined
);
});
test('should call task store with request if provided', async () => {
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const task = {
taskType: 'foo',
params: {},
state: {},
};
const mockRequest = httpServerMock.createKibanaRequest();
await taskScheduling.schedule(task, { request: mockRequest });
expect(mockTaskStore.schedule).toHaveBeenCalled();
expect(mockTaskStore.schedule).toHaveBeenCalledWith(
{
...task,
id: undefined,
schedule: undefined,
traceparent: 'parent',
enabled: true,
},
{
request: mockRequest,
}
);
});
test('allows scheduling tasks that are disabled', async () => {
@ -90,13 +121,16 @@ describe('TaskScheduling', () => {
};
await taskScheduling.schedule(task);
expect(mockTaskStore.schedule).toHaveBeenCalled();
expect(mockTaskStore.schedule).toHaveBeenCalledWith({
...task,
id: undefined,
schedule: undefined,
traceparent: 'parent',
enabled: false,
});
expect(mockTaskStore.schedule).toHaveBeenCalledWith(
{
...task,
id: undefined,
schedule: undefined,
traceparent: 'parent',
enabled: false,
},
undefined
);
});
test('allows scheduling existing tasks that may have already been scheduled', async () => {
@ -839,15 +873,18 @@ describe('TaskScheduling', () => {
};
await taskScheduling.bulkSchedule([task]);
expect(mockTaskStore.bulkSchedule).toHaveBeenCalled();
expect(mockTaskStore.bulkSchedule).toHaveBeenCalledWith([
{
...task,
id: undefined,
schedule: undefined,
traceparent: 'parent',
enabled: true,
},
]);
expect(mockTaskStore.bulkSchedule).toHaveBeenCalledWith(
[
{
...task,
id: undefined,
schedule: undefined,
traceparent: 'parent',
enabled: true,
},
],
undefined
);
});
test('allows scheduling tasks that are disabled', async () => {
@ -865,22 +902,25 @@ describe('TaskScheduling', () => {
};
await taskScheduling.bulkSchedule([task1, task2]);
expect(mockTaskStore.bulkSchedule).toHaveBeenCalled();
expect(mockTaskStore.bulkSchedule).toHaveBeenCalledWith([
{
...task1,
id: undefined,
schedule: undefined,
traceparent: 'parent',
enabled: true,
},
{
...task2,
id: undefined,
schedule: undefined,
traceparent: 'parent',
enabled: false,
},
]);
expect(mockTaskStore.bulkSchedule).toHaveBeenCalledWith(
[
{
...task1,
id: undefined,
schedule: undefined,
traceparent: 'parent',
enabled: true,
},
{
...task2,
id: undefined,
schedule: undefined,
traceparent: 'parent',
enabled: false,
},
],
undefined
);
});
test('doesnt allow naively rescheduling existing tasks that have already been scheduled', async () => {

View file

@ -6,7 +6,7 @@
*/
import pMap from 'p-map';
import { chunk, flatten } from 'lodash';
import { chunk, flatten, omit } from 'lodash';
import agent from 'elastic-apm-node';
import type { Logger } from '@kbn/core/server';
import type { Middleware } from './lib/middleware';
@ -14,6 +14,7 @@ import { parseIntervalAsMillisecond } from './lib/intervals';
import type {
ConcreteTaskInstance,
IntervalSchedule,
ScheduleOptions,
TaskInstanceWithDeprecatedFields,
TaskInstanceWithId,
} from './task';
@ -79,10 +80,10 @@ export class TaskScheduling {
*/
public async schedule(
taskInstance: TaskInstanceWithDeprecatedFields,
options?: Record<string, unknown>
options?: ScheduleOptions
): Promise<ConcreteTaskInstance> {
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
...options,
...omit(options, 'apiKey', 'request'),
taskInstance: ensureDeprecatedFieldsAreCorrected(taskInstance, this.logger),
});
@ -91,11 +92,18 @@ export class TaskScheduling {
? agent.currentTraceparent
: '';
return await this.store.schedule({
...modifiedTask,
traceparent: traceparent || '',
enabled: modifiedTask.enabled ?? true,
});
return await this.store.schedule(
{
...modifiedTask,
traceparent: traceparent || '',
enabled: modifiedTask.enabled ?? true,
},
options?.request
? {
request: options?.request,
}
: undefined
);
}
/**
@ -106,7 +114,7 @@ export class TaskScheduling {
*/
public async bulkSchedule(
taskInstances: TaskInstanceWithDeprecatedFields[],
options?: Record<string, unknown>
options?: ScheduleOptions
): Promise<ConcreteTaskInstance[]> {
const traceparent =
agent.currentTransaction && agent.currentTransaction.type !== 'request'
@ -115,7 +123,7 @@ export class TaskScheduling {
const modifiedTasks = await Promise.all(
taskInstances.map(async (taskInstance) => {
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
...options,
...omit(options, 'apiKey', 'request'),
taskInstance: ensureDeprecatedFieldsAreCorrected(taskInstance, this.logger),
});
return {
@ -126,7 +134,14 @@ export class TaskScheduling {
})
);
return await this.store.bulkSchedule(modifiedTasks);
return await this.store.bulkSchedule(
modifiedTasks,
options?.request
? {
request: options?.request,
}
: undefined
);
}
public async bulkDisable(taskIds: string[], clearStateIdsOrBoolean?: string[] | boolean) {

View file

@ -14,7 +14,13 @@ import { first } from 'rxjs';
import type { TaskInstance, SerializedConcreteTaskInstance } from './task';
import { TaskStatus, TaskLifecycleResult } from './task';
import type { ElasticsearchClientMock } from '@kbn/core/server/mocks';
import { elasticsearchServiceMock, savedObjectsServiceMock } from '@kbn/core/server/mocks';
import {
coreMock,
elasticsearchServiceMock,
httpServerMock,
savedObjectsServiceMock,
} from '@kbn/core/server/mocks';
import { savedObjectsClientMock } from '@kbn/core-saved-objects-api-server-mocks';
import type { SearchOpts, AggregationOpts } from './task_store';
import { TaskStore, taskInstanceToAttributes } from './task_store';
import { savedObjectsRepositoryMock } from '@kbn/core/server/mocks';
@ -26,6 +32,12 @@ import { AdHocTaskCounter } from './lib/adhoc_task_counter';
import { asErr, asOk } from './lib/result_type';
import type { UpdateByQueryResponse } from '@elastic/elasticsearch/lib/api/types';
import { MsearchError } from './lib/msearch_error';
import { getApiKeyAndUserScope } from './lib/api_key_utils';
import { spacesMock } from '@kbn/spaces-plugin/server/mocks';
import type {
EncryptedSavedObjectsClient,
EncryptedSavedObjectsClientOptions,
} from '@kbn/encrypted-saved-objects-shared';
const mockGetValidatedTaskInstanceFromReading = jest.fn();
const mockGetValidatedTaskInstanceForUpdating = jest.fn();
@ -40,12 +52,31 @@ jest.mock('./task_validator', () => {
};
});
jest.mock('./lib/api_key_utils', () => ({
getApiKeyAndUserScope: jest.fn(),
}));
function createEncryptedSavedObjectsClientMock(opts?: EncryptedSavedObjectsClientOptions) {
return {
getDecryptedAsInternalUser: jest.fn(),
createPointInTimeFinderDecryptedAsInternalUser: jest.fn((findOptions, deps) =>
savedObjectsClientMock.create().createPointInTimeFinder(findOptions, deps)
),
} as unknown as jest.Mocked<EncryptedSavedObjectsClient>;
}
const savedObjectsClient = savedObjectsRepositoryMock.create();
const scopedSavedObjectsClient = savedObjectsRepositoryMock.create();
const esoClient = createEncryptedSavedObjectsClientMock();
const serializer = savedObjectsServiceMock.createSerializer();
const adHocTaskCounter = new AdHocTaskCounter();
const randomId = () => `id-${_.random(1, 20)}`;
const coreStart = coreMock.createStart();
const spacesStart = spacesMock.createStart();
beforeEach(() => {
jest.resetAllMocks();
jest.requireMock('./task_validator').TaskValidator.mockImplementation(() => {
@ -113,14 +144,21 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
spaces: spacesStart,
canEncryptSavedObjects: true,
});
store.registerEncryptedSavedObjectsClient(esoClient);
});
afterEach(() => {
adHocTaskCounter.reset();
jest.resetAllMocks();
});
async function testSchedule(task: unknown) {
async function testSchedule(task: unknown, options?: Record<string, unknown>) {
savedObjectsClient.create.mockImplementation(async (type: string, attributes: unknown) => ({
id: 'testid',
type,
@ -128,7 +166,7 @@ describe('TaskStore', () => {
references: [],
version: '123',
}));
const result = await store.schedule(task as TaskInstance);
const result = await store.schedule(task as TaskInstance, options);
expect(savedObjectsClient.create).toHaveBeenCalledTimes(1);
@ -220,12 +258,124 @@ describe('TaskStore', () => {
expect(attributes.state).toEqual('{}');
});
test('schedule a task with API key if request is provided', async () => {
const task = {
id: 'id',
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
traceparent: 'apmTraceparent',
};
const mockApiKey = Buffer.from('apiKeyId:apiKey').toString('base64');
const mockUserScope = {
apiKeyId: 'apiKeyId',
apiKeyCreatedBy: 'testUser',
spaceId: 'testSpace',
};
const request = httpServerMock.createKibanaRequest();
const apiKeyAndUserScopeMap = new Map();
apiKeyAndUserScopeMap.set('id', {
apiKey: mockApiKey,
userScope: mockUserScope,
});
(getApiKeyAndUserScope as jest.Mock).mockResolvedValueOnce(apiKeyAndUserScopeMap);
coreStart.savedObjects.getScopedClient.mockReturnValueOnce(scopedSavedObjectsClient);
scopedSavedObjectsClient.create.mockImplementation(
async (type: string, attributes: unknown) => ({
id: 'testid',
type,
attributes,
references: [],
version: '123',
})
);
const result = await store.schedule(task as TaskInstance, { request });
expect(scopedSavedObjectsClient.create).toHaveBeenCalledWith(
'task',
{
attempts: 0,
params: '{"hello":"world"}',
retryAt: null,
runAt: '2019-02-12T21:01:22.479Z',
scheduledAt: '2019-02-12T21:01:22.479Z',
startedAt: null,
state: '{"foo":"bar"}',
status: 'idle',
taskType: 'report',
traceparent: 'apmTraceparent',
partition: 225,
apiKey: mockApiKey,
userScope: mockUserScope,
},
{
id: 'id',
refresh: false,
}
);
expect(getApiKeyAndUserScope).toHaveBeenCalledWith(
[task],
request,
true,
coreStart.security,
spacesStart
);
expect(savedObjectsClient.create).not.toHaveBeenCalled();
expect(result).toEqual({
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
traceparent: 'apmTraceparent',
attempts: 0,
scheduledAt: mockedDate,
startedAt: null,
retryAt: null,
runAt: mockedDate,
status: 'idle',
partition: 225,
apiKey: mockApiKey,
userScope: mockUserScope,
id: 'testid',
version: '123',
});
});
test('errors if the task type is unknown', async () => {
await expect(testSchedule({ taskType: 'nope', params: {}, state: {} })).rejects.toThrow(
/Unsupported task type "nope"/i
);
});
test('errors if API key could not be created', async () => {
const task = {
id: 'id',
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
traceparent: 'apmTraceparent',
};
(getApiKeyAndUserScope as jest.Mock).mockRejectedValueOnce(
new Error('Something went wrong!')
);
const request = httpServerMock.createKibanaRequest();
await expect(store.schedule(task as TaskInstance, { request })).rejects.toThrow(
'Something went wrong!'
);
expect(getApiKeyAndUserScope).toHaveBeenCalled();
});
test('pushes error from saved objects client to errors$', async () => {
const task: TaskInstance = {
id: 'id',
@ -290,6 +440,8 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
});
});
@ -362,7 +514,45 @@ describe('TaskStore', () => {
typeof elasticsearchServiceMock.createClusterClient
>['asInternalUser'];
beforeAll(() => {
const mockTask = {
taskType: 'taskWithApiKey',
params: '{}',
state: '{}',
enabled: true,
schedule: {
interval: '30s',
},
traceparent: '',
attempts: 1,
scheduledAt: '2019-02-12T21:01:22.479Z',
startedAt: '2019-02-12T21:01:22.479Z',
retryAt: '2019-02-12T21:01:22.479Z',
runAt: '2019-02-12T21:01:22.479Z',
status: 'running',
partition: 237,
userScope: {
apiKeyId: 'EJYCtpUBGuyFd3FroZmZ',
spaceId: 'default',
apiKeyCreatedByUser: false,
},
ownerId: 'kibana:5b2de169-2785-441b-ae8c-186a1936b17d',
id: 'task1',
version: '123',
};
beforeEach(() => {
const mockSerializer = savedObjectsServiceMock.createSerializer();
mockSerializer.isRawSavedObject = jest.fn().mockReturnValue(true);
mockSerializer.rawToSavedObject = jest.fn().mockImplementation((doc) => ({
id: 'task1',
version: '123',
type: 'task',
references: [],
attributes: {
...doc._source.task,
},
}));
esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
childEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
esClient.child.mockReturnValue(childEsClient as unknown as Client);
@ -370,7 +560,7 @@ describe('TaskStore', () => {
logger: mockLogger(),
index: 'tasky',
taskManagerId: '',
serializer,
serializer: mockSerializer,
esClient,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
@ -379,7 +569,29 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
canEncryptSavedObjects: true,
});
esoClient.createPointInTimeFinderDecryptedAsInternalUser = jest.fn().mockResolvedValue({
close: jest.fn(),
find: function* asyncGenerator() {
yield {
saved_objects: [
{
id: 'task1',
attributes: {
...mockTask,
apiKey: 'decryptedApiKey',
},
},
],
};
},
});
store.registerEncryptedSavedObjectsClient(esoClient);
});
async function testMsearch(
@ -464,6 +676,35 @@ describe('TaskStore', () => {
});
});
test('should return tasks with decrypted API keys', async () => {
const { result } = await testMsearch(
[{}],
[
{
hits: [
{
_index: '.kibana_task_manager_8.16.0_001',
_source: {
task: { ...mockTask, apiKey: 'encryptedKey' },
},
},
],
},
]
);
expect(result.docs[0]).toEqual({
...mockTask,
retryAt: new Date(mockTask.retryAt),
runAt: new Date(mockTask.runAt),
scheduledAt: new Date(mockTask.scheduledAt),
startedAt: new Date(mockTask.startedAt),
state: {},
params: {},
apiKey: 'decryptedApiKey',
});
});
test('pushes error from call cluster to errors$', async () => {
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
childEsClient.msearch.mockResponse({
@ -514,6 +755,8 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
});
});
@ -638,6 +881,8 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
});
});
@ -795,6 +1040,8 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
});
});
@ -948,6 +1195,8 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
});
});
@ -1360,8 +1609,36 @@ describe('TaskStore', () => {
describe('remove', () => {
let store: TaskStore;
const mockApiKey = Buffer.from('apiKeyId:apiKey').toString('base64');
beforeAll(() => {
const mockTask = {
id: 'task1',
type: 'test',
attributes: {
attempts: 0,
params: '{"hello":"world"}',
retryAt: null,
runAt: '2019-02-12T21:01:22.479Z',
scheduledAt: '2019-02-12T21:01:22.479Z',
startedAt: null,
state: '{"foo":"bar"}',
stateVersion: 1,
status: 'idle',
taskType: 'report',
traceparent: 'apmTraceparent',
partition: 225,
apiKey: mockApiKey,
userScope: {
apiKeyId: 'apiKeyId',
apiKeyCreatedBy: 'testUser',
spaceId: 'testSpace',
},
},
references: [],
version: '123',
};
beforeEach(() => {
store = new TaskStore({
logger: mockLogger(),
index: 'tasky',
@ -1375,17 +1652,43 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
spaces: spacesStart,
canEncryptSavedObjects: true,
});
esoClient.createPointInTimeFinderDecryptedAsInternalUser = jest.fn().mockResolvedValue({
close: jest.fn(),
find: function* asyncGenerator() {
yield { saved_objects: [mockTask] };
},
});
store.registerEncryptedSavedObjectsClient(esoClient);
});
test('removes the task with the specified id', async () => {
savedObjectsClient.get.mockResolvedValueOnce(mockTask);
const id = randomId();
const result = await store.remove(id);
expect(result).toBeUndefined();
expect(savedObjectsClient.delete).toHaveBeenCalledWith('task', id, { refresh: false });
});
test('invalidates API key of task with api key', async () => {
savedObjectsClient.get.mockResolvedValueOnce(mockTask);
const id = 'task1';
const result = await store.remove(id);
expect(result).toBeUndefined();
expect(savedObjectsClient.delete).toHaveBeenCalledWith('task', id, { refresh: false });
expect(coreStart.security.authc.apiKeys.invalidateAsInternalUser).toHaveBeenCalledWith({
ids: ['apiKeyId'],
});
});
test('pushes error from saved objects client to errors$', async () => {
savedObjectsClient.get.mockResolvedValueOnce(mockTask);
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
savedObjectsClient.delete.mockRejectedValue(new Error('Failure'));
await expect(store.remove(randomId())).rejects.toThrowErrorMatchingInlineSnapshot(
@ -1397,10 +1700,66 @@ describe('TaskStore', () => {
describe('bulkRemove', () => {
let store: TaskStore;
const mockApiKey1 = Buffer.from('apiKeyId1:apiKey').toString('base64');
const mockApiKey2 = Buffer.from('apiKeyId2:apiKey').toString('base64');
const mockTask1 = {
id: 'task1',
type: 'test',
attributes: {
attempts: 0,
params: '{"hello":"world"}',
retryAt: null,
runAt: '2019-02-12T21:01:22.479Z',
scheduledAt: '2019-02-12T21:01:22.479Z',
startedAt: null,
state: '{"foo":"bar"}',
stateVersion: 1,
status: 'idle',
taskType: 'report',
traceparent: 'apmTraceparent',
partition: 225,
apiKey: mockApiKey1,
userScope: {
apiKeyId: 'apiKeyId1',
apiKeyCreatedBy: 'testUser',
spaceId: 'testSpace',
},
},
references: [],
version: '123',
};
const mockTask2 = {
id: 'task2',
type: 'test',
attributes: {
attempts: 0,
params: '{"hello":"world"}',
retryAt: null,
runAt: '2019-02-12T21:01:22.479Z',
scheduledAt: '2019-02-12T21:01:22.479Z',
startedAt: null,
state: '{"foo":"bar"}',
stateVersion: 1,
status: 'idle',
taskType: 'report',
traceparent: 'apmTraceparent',
partition: 225,
apiKey: mockApiKey2,
userScope: {
apiKeyId: 'apiKeyId2',
apiKeyCreatedBy: 'testUser',
spaceId: 'testSpace',
},
},
references: [],
version: '123',
};
const tasksIdsToDelete = [randomId(), randomId()];
beforeAll(() => {
beforeEach(() => {
store = new TaskStore({
logger: mockLogger(),
index: 'tasky',
@ -1414,10 +1773,26 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
spaces: spacesStart,
canEncryptSavedObjects: true,
});
esoClient.createPointInTimeFinderDecryptedAsInternalUser = jest.fn().mockResolvedValue({
close: jest.fn(),
find: function* asyncGenerator() {
yield { saved_objects: [mockTask1, mockTask2] };
},
});
store.registerEncryptedSavedObjectsClient(esoClient);
});
test('removes the tasks with the specified ids', async () => {
savedObjectsClient.bulkGet.mockResolvedValueOnce({
saved_objects: [mockTask1, mockTask2],
});
const result = await store.bulkRemove(tasksIdsToDelete);
expect(result).toBeUndefined();
expect(savedObjectsClient.bulkDelete).toHaveBeenCalledWith(
@ -1429,7 +1804,21 @@ describe('TaskStore', () => {
);
});
test('bulk invalidates API key of tasks with API keys', async () => {
savedObjectsClient.bulkGet.mockResolvedValueOnce({
saved_objects: [mockTask1, mockTask2],
});
const result = await store.bulkRemove(['task1', 'task2']);
expect(result).toBeUndefined();
expect(coreStart.security.authc.apiKeys.invalidateAsInternalUser).toHaveBeenCalledWith({
ids: ['apiKeyId1', 'apiKeyId2'],
});
});
test('pushes error from saved objects client to errors$', async () => {
savedObjectsClient.bulkGet.mockResolvedValueOnce({
saved_objects: [mockTask1, mockTask2],
});
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
savedObjectsClient.bulkDelete.mockRejectedValue(new Error('Failure'));
await expect(store.bulkRemove(tasksIdsToDelete)).rejects.toThrowErrorMatchingInlineSnapshot(
@ -1456,6 +1845,8 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
});
});
@ -1519,6 +1910,8 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
});
});
@ -1617,6 +2010,8 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
});
expect(await store.getLifecycle(task.id)).toEqual(status);
@ -1642,6 +2037,8 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
});
expect(await store.getLifecycle(randomId())).toEqual(TaskLifecycleResult.NotFound);
@ -1665,6 +2062,8 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
});
return expect(store.getLifecycle(randomId())).rejects.toThrow('Bad Request');
@ -1688,11 +2087,18 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
spaces: spacesStart,
canEncryptSavedObjects: true,
});
store.registerEncryptedSavedObjectsClient(esoClient);
});
afterEach(() => {
adHocTaskCounter.reset();
jest.resetAllMocks();
});
async function testBulkSchedule(task: unknown) {
@ -1799,6 +2205,166 @@ describe('TaskStore', () => {
]);
});
test('bulk schedules tasks with API key if request is provided', async () => {
const task1 = {
id: 'task1',
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
};
const task2 = {
id: 'task2',
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
};
const mockApiKey = Buffer.from('apiKeyId:apiKey').toString('base64');
const mockUserScope = {
apiKeyId: 'apiKeyId',
apiKeyCreatedBy: 'testUser',
spaceId: 'testSpace',
};
const request = httpServerMock.createKibanaRequest();
const apiKeyAndUserScopeMap = new Map();
apiKeyAndUserScopeMap.set('task1', {
apiKey: mockApiKey,
userScope: mockUserScope,
});
apiKeyAndUserScopeMap.set('task2', {
apiKey: mockApiKey,
userScope: mockUserScope,
});
(getApiKeyAndUserScope as jest.Mock).mockResolvedValueOnce(apiKeyAndUserScopeMap);
coreStart.savedObjects.getScopedClient.mockReturnValueOnce(scopedSavedObjectsClient);
scopedSavedObjectsClient.bulkCreate.mockImplementationOnce(async () => ({
saved_objects: [
{
id: 'task1',
type: 'test',
attributes: {
attempts: 0,
params: '{"hello":"world"}',
retryAt: null,
runAt: '2019-02-12T21:01:22.479Z',
scheduledAt: '2019-02-12T21:01:22.479Z',
startedAt: null,
state: '{"foo":"bar"}',
stateVersion: 1,
status: 'idle',
taskType: 'report',
traceparent: 'apmTraceparent',
partition: 225,
apiKey: mockApiKey,
userScope: mockUserScope,
},
references: [],
version: '123',
},
{
id: 'task2',
type: 'test',
attributes: {
attempts: 0,
params: '{"hello":"world"}',
retryAt: null,
runAt: '2019-02-12T21:01:22.479Z',
scheduledAt: '2019-02-12T21:01:22.479Z',
startedAt: null,
state: '{"foo":"bar"}',
stateVersion: 1,
status: 'idle',
taskType: 'report',
traceparent: 'apmTraceparent',
partition: 225,
apiKey: mockApiKey,
userScope: mockUserScope,
},
references: [],
version: '123',
},
],
}));
const result = await store.bulkSchedule([task1, task2], { request });
expect(getApiKeyAndUserScope).toHaveBeenCalledWith(
[task1, task2],
request,
true,
coreStart.security,
spacesStart
);
expect(savedObjectsClient.create).not.toHaveBeenCalled();
expect(result).toEqual([
{
id: 'task1',
attempts: 0,
params: { hello: 'world' },
retryAt: null,
runAt: mockedDate,
scheduledAt: mockedDate,
startedAt: null,
state: { foo: 'bar' },
stateVersion: 1,
status: 'idle',
taskType: 'report',
traceparent: 'apmTraceparent',
partition: 225,
apiKey: mockApiKey,
userScope: mockUserScope,
version: '123',
},
{
id: 'task2',
attempts: 0,
params: { hello: 'world' },
retryAt: null,
runAt: mockedDate,
scheduledAt: mockedDate,
startedAt: null,
state: { foo: 'bar' },
stateVersion: 1,
status: 'idle',
taskType: 'report',
traceparent: 'apmTraceparent',
partition: 225,
apiKey: mockApiKey,
userScope: mockUserScope,
version: '123',
},
]);
});
test('errors if API key could not be created', async () => {
const task = {
id: 'id',
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
traceparent: 'apmTraceparent',
};
(getApiKeyAndUserScope as jest.Mock).mockRejectedValueOnce(
new Error('Something went wrong!')
);
const request = httpServerMock.createKibanaRequest();
await expect(store.bulkSchedule([task as TaskInstance], { request })).rejects.toThrow(
'Something went wrong!'
);
expect(getApiKeyAndUserScope).toHaveBeenCalled();
});
test('errors if the task type is unknown', async () => {
await expect(testBulkSchedule([{ taskType: 'nope', params: {}, state: {} }])).rejects.toThrow(
/Unsupported task type "nope"/i
@ -1864,6 +2430,8 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
});
expect(jest.requireMock('./task_validator').TaskValidator).toHaveBeenCalledWith({
@ -1889,6 +2457,8 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
});
expect(jest.requireMock('./task_validator').TaskValidator).toHaveBeenCalledWith({
@ -1923,6 +2493,8 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
});
});
test('should pass requestTimeout', async () => {
@ -1960,6 +2532,8 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
});
});
@ -2078,6 +2652,8 @@ describe('TaskStore', () => {
requestTimeouts: {
update_by_query: 1000,
},
savedObjectsService: coreStart.savedObjects,
security: coreStart.security,
});
});

View file

@ -15,9 +15,12 @@ import { omit, defaults, get } from 'lodash';
import type { SavedObjectError } from '@kbn/core-saved-objects-common';
import type { estypes } from '@elastic/elasticsearch';
import type { SavedObjectsBulkDeleteResponse, Logger } from '@kbn/core/server';
import type {
SavedObjectsBulkDeleteResponse,
Logger,
SavedObjectsServiceStart,
SecurityServiceStart,
KibanaRequest,
SavedObject,
ISavedObjectsSerializer,
SavedObjectsRawDoc,
@ -26,10 +29,16 @@ import type {
ElasticsearchClient,
} from '@kbn/core/server';
import { SECURITY_EXTENSION_ID, SPACES_EXTENSION_ID } from '@kbn/core/server';
import type { SpacesPluginStart } from '@kbn/spaces-plugin/server';
import type { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-shared';
import { decodeRequestVersion, encodeVersion } from '@kbn/core-saved-objects-base-server-internal';
import { nodeBuilder } from '@kbn/es-query';
import type { RequestTimeoutsConfig } from './config';
import type { Result } from './lib/result_type';
import { asOk, asErr } from './lib/result_type';
import { asOk, asErr, unwrap } from './lib/result_type';
import type {
ConcreteTaskInstance,
@ -39,6 +48,7 @@ import type {
SerializedConcreteTaskInstance,
PartialConcreteTaskInstance,
PartialSerializedConcreteTaskInstance,
ApiKeyOptions,
} from './task';
import { TaskStatus, TaskLifecycleResult } from './task';
@ -50,6 +60,8 @@ import { MAX_PARTITIONS } from './lib/task_partitioner';
import type { ErrorOutput } from './lib/bulk_operation_buffer';
import { MsearchError } from './lib/msearch_error';
import { BulkUpdateError } from './lib/bulk_update_error';
import { TASK_SO_NAME } from './saved_objects';
import { getApiKeyAndUserScope } from './lib/api_key_utils';
export interface StoreOpts {
esClient: ElasticsearchClient;
@ -57,11 +69,16 @@ export interface StoreOpts {
taskManagerId: string;
definitions: TaskTypeDictionary;
savedObjectsRepository: ISavedObjectsRepository;
savedObjectsService: SavedObjectsServiceStart;
serializer: ISavedObjectsSerializer;
adHocTaskCounter: AdHocTaskCounter;
allowReadingInvalidState: boolean;
logger: Logger;
requestTimeouts: RequestTimeoutsConfig;
security: SecurityServiceStart;
canEncryptSavedObjects?: boolean;
esoClient?: EncryptedSavedObjectsClient;
spaces?: SpacesPluginStart;
}
export interface SearchOpts {
@ -121,12 +138,17 @@ export class TaskStore {
public readonly taskValidator: TaskValidator;
private esClient: ElasticsearchClient;
private esoClient?: EncryptedSavedObjectsClient;
private esClientWithoutRetries: ElasticsearchClient;
private definitions: TaskTypeDictionary;
private savedObjectsRepository: ISavedObjectsRepository;
private savedObjectsService: SavedObjectsServiceStart;
private serializer: ISavedObjectsSerializer;
private adHocTaskCounter: AdHocTaskCounter;
private requestTimeouts: RequestTimeoutsConfig;
private security: SecurityServiceStart;
private canEncryptSavedObjects?: boolean;
private spaces?: SpacesPluginStart;
/**
* Constructs a new TaskStore.
@ -139,11 +161,13 @@ export class TaskStore {
*/
constructor(opts: StoreOpts) {
this.esClient = opts.esClient;
this.esoClient = opts.esoClient;
this.index = opts.index;
this.taskManagerId = opts.taskManagerId;
this.definitions = opts.definitions;
this.serializer = opts.serializer;
this.savedObjectsRepository = opts.savedObjectsRepository;
this.savedObjectsService = opts.savedObjectsService;
this.adHocTaskCounter = opts.adHocTaskCounter;
this.taskValidator = new TaskValidator({
logger: opts.logger,
@ -156,6 +180,104 @@ export class TaskStore {
maxRetries: 0,
});
this.requestTimeouts = opts.requestTimeouts;
this.security = opts.security;
this.spaces = opts.spaces;
this.canEncryptSavedObjects = opts.canEncryptSavedObjects;
}
public registerEncryptedSavedObjectsClient(client: EncryptedSavedObjectsClient) {
this.esoClient = client;
}
private canEncryptSo() {
return !!(this.esoClient && this.canEncryptSavedObjects);
}
private getSoClientForCreate(options: ApiKeyOptions) {
if (options.request) {
return this.savedObjectsService.getScopedClient(options.request, {
includedHiddenTypes: [TASK_SO_NAME],
excludedExtensions: [SECURITY_EXTENSION_ID, SPACES_EXTENSION_ID],
});
}
return this.savedObjectsRepository;
}
private async maybeGetApiKeyFromRequest(taskInstances: TaskInstance[], request?: KibanaRequest) {
if (!request) {
return null;
}
let userScopeAndApiKey;
try {
userScopeAndApiKey = await getApiKeyAndUserScope(
taskInstances,
request,
this.canEncryptSo(),
this.security,
this.spaces
);
} catch (e) {
this.errors$.next(e);
throw e;
}
return userScopeAndApiKey;
}
private async bulkGetDecryptedTaskApiKeys(ids: string[]) {
const result = new Map<string, string | undefined>();
if (!this.canEncryptSo() || !ids.length) {
return result;
}
const kueryNode = nodeBuilder.or(
ids.map((id) => {
return nodeBuilder.is(`${TASK_SO_NAME}.id`, `${TASK_SO_NAME}:${id}`);
})
);
const finder =
await this.esoClient!.createPointInTimeFinderDecryptedAsInternalUser<SerializedConcreteTaskInstance>(
{
type: TASK_SO_NAME,
filter: kueryNode,
}
);
for await (const response of finder.find()) {
response.saved_objects.forEach((savedObject) => {
result.set(savedObject.id, savedObject.attributes.apiKey);
});
}
await finder.close();
return result;
}
private async bulkGetAndMergeTasksWithDecryptedApiKey(tasks: ConcreteTaskInstance[]) {
const ids: string[] = [];
tasks.forEach((task) => {
if (task.apiKey) {
ids.push(task.id);
}
});
if (!ids.length) {
return tasks;
}
const decryptedTaskApiKeysMap = await this.bulkGetDecryptedTaskApiKeys(ids);
const tasksWithDecryptedApiKeys = tasks.map((task) => ({
...task,
...(decryptedTaskApiKeysMap.get(task.id)
? { apiKey: decryptedTaskApiKeysMap.get(task.id) }
: {}),
}));
return tasksWithDecryptedApiKeys;
}
/**
@ -174,17 +296,30 @@ export class TaskStore {
*
* @param task - The task being scheduled.
*/
public async schedule(taskInstance: TaskInstance): Promise<ConcreteTaskInstance> {
public async schedule(
taskInstance: TaskInstance,
options?: ApiKeyOptions
): Promise<ConcreteTaskInstance> {
this.definitions.ensureHas(taskInstance.taskType);
const apiKeyAndUserScopeMap =
(await this.maybeGetApiKeyFromRequest([taskInstance], options?.request)) || new Map();
const { apiKey, userScope } = apiKeyAndUserScopeMap.get(taskInstance.id) || {};
const soClient = this.getSoClientForCreate(options || {});
let savedObject;
try {
const id = taskInstance.id || v4();
const validatedTaskInstance =
this.taskValidator.getValidatedTaskInstanceForUpdating(taskInstance);
savedObject = await this.savedObjectsRepository.create<SerializedConcreteTaskInstance>(
savedObject = await soClient.create<SerializedConcreteTaskInstance>(
'task',
taskInstanceToAttributes(validatedTaskInstance, id),
{
...taskInstanceToAttributes(validatedTaskInstance, id),
...(userScope ? { userScope } : {}),
...(apiKey ? { apiKey } : {}),
},
{ id, refresh: false }
);
if (get(taskInstance, 'schedule.interval', null) == null) {
@ -204,25 +339,37 @@ export class TaskStore {
*
* @param tasks - The tasks being scheduled.
*/
public async bulkSchedule(taskInstances: TaskInstance[]): Promise<ConcreteTaskInstance[]> {
public async bulkSchedule(
taskInstances: TaskInstance[],
options?: ApiKeyOptions
): Promise<ConcreteTaskInstance[]> {
const apiKeyAndUserScopeMap =
(await this.maybeGetApiKeyFromRequest(taskInstances, options?.request)) || new Map();
const soClient = this.getSoClientForCreate(options || {});
const objects = taskInstances.map((taskInstance) => {
const { apiKey, userScope } = apiKeyAndUserScopeMap.get(taskInstance.id) || {};
const id = taskInstance.id || v4();
this.definitions.ensureHas(taskInstance.taskType);
const validatedTaskInstance =
this.taskValidator.getValidatedTaskInstanceForUpdating(taskInstance);
return {
type: 'task',
attributes: taskInstanceToAttributes(validatedTaskInstance, id),
attributes: {
...taskInstanceToAttributes(validatedTaskInstance, id),
...(apiKey ? { apiKey } : {}),
...(userScope ? { userScope } : {}),
},
id,
};
});
let savedObjects;
try {
savedObjects = await this.savedObjectsRepository.bulkCreate<SerializedConcreteTaskInstance>(
objects,
{ refresh: false }
);
savedObjects = await soClient.bulkCreate<SerializedConcreteTaskInstance>(objects, {
refresh: false,
});
this.adHocTaskCounter.increment(
taskInstances.filter((task) => {
return get(task, 'schedule.interval', null) == null;
@ -440,6 +587,15 @@ export class TaskStore {
* @returns {Promise<void>}
*/
public async remove(id: string): Promise<void> {
const taskInstance = await this.get(id);
const { apiKey, userScope } = taskInstance;
if (apiKey && userScope) {
if (!userScope.apiKeyCreatedByUser) {
await this.security.authc.apiKeys.invalidateAsInternalUser({ ids: [userScope.apiKeyId] });
}
}
try {
await this.savedObjectsRepository.delete('task', id, { refresh: false });
} catch (e) {
@ -455,6 +611,25 @@ export class TaskStore {
* @returns {Promise<SavedObjectsBulkDeleteResponse>}
*/
public async bulkRemove(taskIds: string[]): Promise<SavedObjectsBulkDeleteResponse> {
const taskInstances = await this.bulkGet(taskIds);
const apiKeyIdsToRemove: string[] = [];
taskInstances.forEach((taskInstance) => {
const unwrappedTaskInstance = unwrap(taskInstance) as ConcreteTaskInstance;
const { apiKey, userScope } = unwrappedTaskInstance;
if (apiKey && userScope) {
if (!userScope.apiKeyCreatedByUser) {
apiKeyIdsToRemove.push(userScope.apiKeyId);
}
}
});
if (apiKeyIdsToRemove.length) {
await this.security.authc.apiKeys.invalidateAsInternalUser({
ids: [...new Set(apiKeyIdsToRemove)],
});
}
try {
const savedObjectsToDelete = taskIds.map((taskId) => ({ id: taskId, type: 'task' }));
return await this.savedObjectsRepository.bulkDelete(savedObjectsToDelete, { refresh: false });
@ -479,7 +654,10 @@ export class TaskStore {
throw e;
}
const taskInstance = savedObjectToConcreteTaskInstance(result);
return taskInstance;
const tasksWithDecryptedApiKeys = await this.bulkGetAndMergeTasksWithDecryptedApiKey([
taskInstance,
]);
return tasksWithDecryptedApiKeys[0];
}
/**
@ -498,12 +676,23 @@ export class TaskStore {
this.errors$.next(e);
throw e;
}
const tasks: ConcreteTaskInstance[] = [];
result.saved_objects.forEach((task) => {
if (!task.error) {
tasks.push(savedObjectToConcreteTaskInstance(task));
}
});
const tasksWithDecryptedApiKeys = await this.bulkGetAndMergeTasksWithDecryptedApiKey(tasks);
const taskMap = new Map();
tasksWithDecryptedApiKeys.forEach((task) => taskMap.set(task.id, task));
return result.saved_objects.map((task) => {
if (task.error) {
return asErr({ id: task.id, type: task.type, error: task.error });
}
const taskInstance = savedObjectToConcreteTaskInstance(task);
return asOk(taskInstance);
return asOk(taskMap.get(task.id));
});
}
@ -603,8 +792,10 @@ export class TaskStore {
}
const allSortedTasks = claimSort(this.definitions, allTasks);
return { docs: allSortedTasks, versionMap };
const tasksWithDecryptedApiKeys = await this.bulkGetAndMergeTasksWithDecryptedApiKey(
allSortedTasks
);
return { docs: tasksWithDecryptedApiKeys, versionMap };
}
private async search(
@ -627,8 +818,13 @@ export class TaskStore {
} = result;
const versionMap = this.createVersionMap(tasks);
const concreteTasks = this.filterTasks(tasks);
const tasksWithDecryptedApiKeys = await this.bulkGetAndMergeTasksWithDecryptedApiKey(
concreteTasks
);
return {
docs: this.filterTasks(tasks),
docs: tasksWithDecryptedApiKeys,
versionMap,
};
} catch (e) {
@ -776,7 +972,7 @@ export function taskInstanceToAttributes(
id: string
): SerializedConcreteTaskInstance {
return {
...omit(doc, 'id', 'version'),
...omit(doc, 'id', 'version', 'userScope', 'apiKey'),
params: JSON.stringify(doc.params || {}),
state: JSON.stringify(doc.state || {}),
attempts: (doc as ConcreteTaskInstance).attempts || 0,
@ -793,7 +989,7 @@ export function partialTaskInstanceToAttributes(
doc: PartialConcreteTaskInstance
): PartialSerializedConcreteTaskInstance {
return {
...omit(doc, 'id', 'version'),
...omit(doc, 'id', 'version', 'userScope', 'apiKey'),
...(doc.params ? { params: JSON.stringify(doc.params) } : {}),
...(doc.state ? { state: JSON.stringify(doc.state) } : {}),
...(doc.scheduledAt ? { scheduledAt: doc.scheduledAt.toISOString() } : {}),

View file

@ -29,6 +29,11 @@
"@kbn/cloud-plugin",
"@kbn/core-saved-objects-base-server-internal",
"@kbn/core-elasticsearch-server",
"@kbn/es-query",
"@kbn/core-http-server-mocks",
"@kbn/spaces-plugin",
"@kbn/encrypted-saved-objects-shared",
"@kbn/core-saved-objects-api-server-mocks",
],
"exclude": ["target/**/*"]
}

View file

@ -35,6 +35,23 @@ const taskManagerQuery = {
},
};
const taskSchema = schema.object({
task: schema.object({
enabled: schema.boolean({ defaultValue: true }),
taskType: schema.string(),
schedule: schema.maybe(
schema.object({
interval: schema.string(),
})
),
interval: schema.maybe(schema.string()),
params: schema.recordOf(schema.string(), schema.any(), { defaultValue: {} }),
state: schema.recordOf(schema.string(), schema.any(), { defaultValue: {} }),
id: schema.maybe(schema.string()),
timeoutOverride: schema.maybe(schema.string()),
}),
});
export function initRoutes(
router: IRouter,
taskManagerStart: Promise<TaskManagerStartContract>,
@ -56,22 +73,7 @@ export function initRoutes(
},
},
validate: {
body: schema.object({
task: schema.object({
enabled: schema.boolean({ defaultValue: true }),
taskType: schema.string(),
schedule: schema.maybe(
schema.object({
interval: schema.string(),
})
),
interval: schema.maybe(schema.string()),
params: schema.recordOf(schema.string(), schema.any(), { defaultValue: {} }),
state: schema.recordOf(schema.string(), schema.any(), { defaultValue: {} }),
id: schema.maybe(schema.string()),
timeoutOverride: schema.maybe(schema.string()),
}),
}),
body: taskSchema,
},
},
async function (
@ -92,6 +94,37 @@ export function initRoutes(
}
);
router.post(
{
path: `/api/sample_tasks/schedule_with_api_key`,
validate: {
body: taskSchema,
},
security: {
authz: {
enabled: false,
reason: 'This route is opted out from authorization',
},
},
},
async function (
context: RequestHandlerContext,
req: KibanaRequest<any, any, any, any>,
res: KibanaResponseFactory
): Promise<IKibanaResponse<any>> {
const taskManager = await taskManagerStart;
const { task: taskFields } = req.body;
const task = {
...taskFields,
scope: [scope],
};
const taskResult = await taskManager.schedule(task, { request: req });
return res.ok({ body: taskResult });
}
);
router.post(
{
path: `/api/sample_tasks/run_soon`,

View file

@ -157,6 +157,20 @@ export default function ({ getService }: FtrProviderContext) {
});
}
function scheduleTaskWithApiKey(
task: Partial<ConcreteTaskInstance | DeprecatedConcreteTaskInstance>
): Promise<SerializedConcreteTaskInstance> {
return supertest
.post('/api/sample_tasks/schedule_with_api_key')
.set('kbn-xsrf', 'xxx')
.send({ task })
.expect(200)
.then((response: { body: SerializedConcreteTaskInstance }) => {
log.debug(`Task Scheduled: ${response.body.id}`);
return response.body;
});
}
function runTaskSoon(task: { id: string }) {
return supertest
.post('/api/sample_tasks/run_soon')
@ -416,6 +430,56 @@ export default function ({ getService }: FtrProviderContext) {
});
});
it('should schedule tasks with API keys if request is provided', async () => {
let queryResult = await supertest
.post('/internal/security/api_key/_query')
.send({})
.set('kbn-xsrf', 'xxx')
.expect(200);
const apiKeysLength = queryResult.body.apiKeys.length;
await scheduleTaskWithApiKey({
id: 'test-task-for-sample-task-plugin-to-test-task-api-key',
taskType: 'sampleTask',
params: {},
});
const result = await currentTask('test-task-for-sample-task-plugin-to-test-task-api-key');
expect(result.apiKey).not.empty();
queryResult = await supertest
.post('/internal/security/api_key/_query')
.send({})
.set('kbn-xsrf', 'xxx')
.expect(200);
expect(
queryResult.body.apiKeys.filter((apiKey: { id: string }) => {
return apiKey.id === result.userScope?.apiKeyId;
}).length
).eql(1);
expect(queryResult.body.apiKeys.length).eql(apiKeysLength + 1);
await supertest.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200);
queryResult = await supertest
.post('/internal/security/api_key/_query')
.send({})
.set('kbn-xsrf', 'xxx')
.expect(200);
expect(
queryResult.body.apiKeys.filter((apiKey: { id: string }) => {
return apiKey.id === result.userScope?.apiKeyId;
}).length
).eql(0);
expect(queryResult.body.apiKeys.length).eql(apiKeysLength);
});
it('should return a task run result when asked to run a task now', async () => {
const originalTask = await scheduleTask({
taskType: 'sampleTask',

View file

@ -5354,6 +5354,10 @@
version "0.0.0"
uid ""
"@kbn/encrypted-saved-objects-shared@link:src/platform/packages/shared/kbn-encrypted-saved-objects-shared":
version "0.0.0"
uid ""
"@kbn/enterprise-search-plugin@link:x-pack/solutions/search/plugins/enterprise_search":
version "0.0.0"
uid ""
@ -7586,6 +7590,10 @@
version "0.0.0"
uid ""
"@kbn/task-manager-dependencies-plugin@link:x-pack/platform/plugins/private/task_manager_dependencies":
version "0.0.0"
uid ""
"@kbn/task-manager-fixture-plugin@link:x-pack/test/alerting_api_integration/common/plugins/task_manager_fixture":
version "0.0.0"
uid ""