[Task Manager] Cleans up legacy plugin structure (#80381)

This PR addresses a list of legacy code debt the plugin has incurred over the past year due to extensive changes in its internals and the adoption of the Kibana Platform.

It includes:
1. The `TaskManager` class has been split into several independent components: `TaskTypeDictionary`,  `TaskPollingLifecycle`,  `TaskScheduling`,  `Middleware`. This has made it easier to understand the roles of the different parts and makes it easier to plug them into the observability work.
2. The exposed `mocks` have been corrected to correctly express the Kibana Platform api
3. The lifecycle has been corrected to remove the need for  intermediary streames/promises which we're needed when we first introduced the `setup`/`start` lifecycle to support legacy.
4. The Logger mocks have been replaced with the platform's `coreMocks` implementation
5. The integration tests now test the plugin's actual public api (instead of the internals).
6. The Legacy Elasticsearch client has been replaced with the typed client in response to the deprecation notice.
7. Typing has been narrowed to prevent the `type` field from conflicting with the key in the `TaskDictionary`. This could have caused the displayed `type` on a task to differ from the `type` used in the Dictionary itself (this broke a test during refactoring and could have caused a bug in production code if left).
This commit is contained in:
Gidi Meir Morris 2020-10-20 13:00:13 +01:00 committed by GitHub
parent 3a206ab198
commit 5460ad741c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
69 changed files with 1586 additions and 1638 deletions

View file

@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { taskManagerMock } from '../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../task_manager/server/mocks';
import { ActionTypeRegistry, ActionTypeRegistryOpts } from './action_type_registry';
import { ActionType, ExecutorType } from './types';
import { ActionExecutor, ExecutorError, ILicenseState, TaskRunnerFactory } from './lib';
@ -13,7 +13,7 @@ import { licenseStateMock } from './lib/license_state.mock';
import { ActionsConfigurationUtilities } from './actions_config';
import { licensingMock } from '../../licensing/server/mocks';
const mockTaskManager = taskManagerMock.setup();
const mockTaskManager = taskManagerMock.createSetup();
let mockedLicenseState: jest.Mocked<ILicenseState>;
let mockedActionsConfig: jest.Mocked<ActionsConfigurationUtilities>;
let actionTypeRegistryParams: ActionTypeRegistryOpts;
@ -66,7 +66,6 @@ describe('register()', () => {
"getRetry": [Function],
"maxAttempts": 1,
"title": "My action type",
"type": "actions:my-action-type",
},
},
]

View file

@ -125,7 +125,6 @@ export class ActionTypeRegistry {
this.taskManager.registerTaskDefinitions({
[`actions:${actionType.id}`]: {
title: actionType.name,
type: `actions:${actionType.id}`,
maxAttempts: actionType.maxAttempts || 1,
getRetry(attempts: number, error: unknown) {
if (error instanceof ExecutorError) {

View file

@ -10,7 +10,7 @@ import { ActionTypeRegistry, ActionTypeRegistryOpts } from './action_type_regist
import { ActionsClient } from './actions_client';
import { ExecutorType, ActionType } from './types';
import { ActionExecutor, TaskRunnerFactory, ILicenseState } from './lib';
import { taskManagerMock } from '../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../task_manager/server/mocks';
import { actionsConfigMock } from './actions_config.mock';
import { getActionsConfigurationUtilities } from './actions_config';
import { licenseStateMock } from './lib/license_state.mock';
@ -34,7 +34,7 @@ const authorization = actionsAuthorizationMock.create();
const executionEnqueuer = jest.fn();
const request = {} as KibanaRequest;
const mockTaskManager = taskManagerMock.setup();
const mockTaskManager = taskManagerMock.createSetup();
let actionsClient: ActionsClient;
let mockedLicenseState: jest.Mocked<ILicenseState>;

View file

@ -6,7 +6,7 @@
import { ActionExecutor, TaskRunnerFactory } from '../lib';
import { ActionTypeRegistry } from '../action_type_registry';
import { taskManagerMock } from '../../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../task_manager/server/mocks';
import { registerBuiltInActionTypes } from './index';
import { Logger } from '../../../../../src/core/server';
import { loggingSystemMock } from '../../../../../src/core/server/mocks';
@ -22,8 +22,8 @@ export function createActionTypeRegistry(): {
} {
const logger = loggingSystemMock.create().get() as jest.Mocked<Logger>;
const actionTypeRegistry = new ActionTypeRegistry({
taskManager: taskManagerMock.createSetup(),
licensing: licensingMock.createSetup(),
taskManager: taskManagerMock.setup(),
taskRunnerFactory: new TaskRunnerFactory(
new ActionExecutor({ isESOUsingEphemeralEncryptionKey: false })
),

View file

@ -6,7 +6,7 @@
import { KibanaRequest } from 'src/core/server';
import uuid from 'uuid';
import { taskManagerMock } from '../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../task_manager/server/mocks';
import { createExecutionEnqueuerFunction } from './create_execute_function';
import { savedObjectsClientMock } from '../../../../src/core/server/mocks';
import { actionTypeRegistryMock } from './action_type_registry.mock';
@ -15,7 +15,7 @@ import {
asSavedObjectExecutionSource,
} from './lib/action_execution_source';
const mockTaskManager = taskManagerMock.start();
const mockTaskManager = taskManagerMock.createStart();
const savedObjectsClient = savedObjectsClientMock.create();
const request = {} as KibanaRequest;

View file

@ -6,9 +6,9 @@
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import { registerActionsUsageCollector } from './actions_usage_collector';
import { taskManagerMock } from '../../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../task_manager/server/mocks';
const mockTaskManagerStart = taskManagerMock.start();
const mockTaskManagerStart = taskManagerMock.createStart();
beforeEach(() => jest.resetAllMocks());

View file

@ -39,7 +39,6 @@ function registerActionsTelemetryTask(
taskManager.registerTaskDefinitions({
[TELEMETRY_TASK_TYPE]: {
title: 'Actions usage fetch task',
type: TELEMETRY_TASK_TYPE,
timeout: '5m',
createTaskRunner: telemetryTaskRunner(logger, core, kibanaIndex),
},

View file

@ -7,9 +7,9 @@
import { TaskRunnerFactory } from './task_runner';
import { AlertTypeRegistry } from './alert_type_registry';
import { AlertType } from './types';
import { taskManagerMock } from '../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../task_manager/server/mocks';
const taskManager = taskManagerMock.setup();
const taskManager = taskManagerMock.createSetup();
const alertTypeRegistryParams = {
taskManager,
taskRunnerFactory: new TaskRunnerFactory(),
@ -118,7 +118,6 @@ describe('register()', () => {
"alerting:test": Object {
"createTaskRunner": [Function],
"title": "Test",
"type": "alerting:test",
},
},
]

View file

@ -86,7 +86,6 @@ export class AlertTypeRegistry {
this.taskManager.registerTaskDefinitions({
[`alerting:${alertType.id}`]: {
title: alertType.name,
type: `alerting:${alertType.id}`,
createTaskRunner: (context: RunContext) =>
this.taskRunnerFactory.create({ ...alertType } as AlertType, context),
},

View file

@ -6,7 +6,7 @@
import { schema } from '@kbn/config-schema';
import { AlertsClient, ConstructorOptions, CreateOptions } from '../alerts_client';
import { savedObjectsClientMock, loggingSystemMock } from '../../../../../../src/core/server/mocks';
import { taskManagerMock } from '../../../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { alertTypeRegistryMock } from '../../alert_type_registry.mock';
import { alertsAuthorizationMock } from '../../authorization/alerts_authorization.mock';
import { encryptedSavedObjectsMock } from '../../../../encrypted_saved_objects/server/mocks';
@ -16,7 +16,7 @@ import { ActionsAuthorization, ActionsClient } from '../../../../actions/server'
import { TaskStatus } from '../../../../task_manager/server';
import { getBeforeSetup, setGlobalDate } from './lib';
const taskManager = taskManagerMock.start();
const taskManager = taskManagerMock.createStart();
const alertTypeRegistry = alertTypeRegistryMock.create();
const unsecuredSavedObjectsClient = savedObjectsClientMock.create();
const encryptedSavedObjects = encryptedSavedObjectsMock.createClient();

View file

@ -5,7 +5,7 @@
*/
import { AlertsClient, ConstructorOptions } from '../alerts_client';
import { savedObjectsClientMock, loggingSystemMock } from '../../../../../../src/core/server/mocks';
import { taskManagerMock } from '../../../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { alertTypeRegistryMock } from '../../alert_type_registry.mock';
import { alertsAuthorizationMock } from '../../authorization/alerts_authorization.mock';
import { encryptedSavedObjectsMock } from '../../../../encrypted_saved_objects/server/mocks';
@ -14,7 +14,7 @@ import { AlertsAuthorization } from '../../authorization/alerts_authorization';
import { ActionsAuthorization } from '../../../../actions/server';
import { getBeforeSetup } from './lib';
const taskManager = taskManagerMock.start();
const taskManager = taskManagerMock.createStart();
const alertTypeRegistry = alertTypeRegistryMock.create();
const unsecuredSavedObjectsClient = savedObjectsClientMock.create();
const encryptedSavedObjects = encryptedSavedObjectsMock.createClient();

View file

@ -5,7 +5,7 @@
*/
import { AlertsClient, ConstructorOptions } from '../alerts_client';
import { savedObjectsClientMock, loggingSystemMock } from '../../../../../../src/core/server/mocks';
import { taskManagerMock } from '../../../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { alertTypeRegistryMock } from '../../alert_type_registry.mock';
import { alertsAuthorizationMock } from '../../authorization/alerts_authorization.mock';
import { encryptedSavedObjectsMock } from '../../../../encrypted_saved_objects/server/mocks';
@ -14,7 +14,7 @@ import { AlertsAuthorization } from '../../authorization/alerts_authorization';
import { ActionsAuthorization } from '../../../../actions/server';
import { getBeforeSetup } from './lib';
const taskManager = taskManagerMock.start();
const taskManager = taskManagerMock.createStart();
const alertTypeRegistry = alertTypeRegistryMock.create();
const unsecuredSavedObjectsClient = savedObjectsClientMock.create();

View file

@ -5,7 +5,7 @@
*/
import { AlertsClient, ConstructorOptions } from '../alerts_client';
import { savedObjectsClientMock, loggingSystemMock } from '../../../../../../src/core/server/mocks';
import { taskManagerMock } from '../../../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { alertTypeRegistryMock } from '../../alert_type_registry.mock';
import { alertsAuthorizationMock } from '../../authorization/alerts_authorization.mock';
import { encryptedSavedObjectsMock } from '../../../../encrypted_saved_objects/server/mocks';
@ -15,7 +15,7 @@ import { ActionsAuthorization } from '../../../../actions/server';
import { TaskStatus } from '../../../../task_manager/server';
import { getBeforeSetup } from './lib';
const taskManager = taskManagerMock.start();
const taskManager = taskManagerMock.createStart();
const alertTypeRegistry = alertTypeRegistryMock.create();
const unsecuredSavedObjectsClient = savedObjectsClientMock.create();

View file

@ -5,7 +5,7 @@
*/
import { AlertsClient, ConstructorOptions } from '../alerts_client';
import { savedObjectsClientMock, loggingSystemMock } from '../../../../../../src/core/server/mocks';
import { taskManagerMock } from '../../../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { alertTypeRegistryMock } from '../../alert_type_registry.mock';
import { alertsAuthorizationMock } from '../../authorization/alerts_authorization.mock';
import { nodeTypes } from '../../../../../../src/plugins/data/common';
@ -16,7 +16,7 @@ import { AlertsAuthorization } from '../../authorization/alerts_authorization';
import { ActionsAuthorization } from '../../../../actions/server';
import { getBeforeSetup, setGlobalDate } from './lib';
const taskManager = taskManagerMock.start();
const taskManager = taskManagerMock.createStart();
const alertTypeRegistry = alertTypeRegistryMock.create();
const unsecuredSavedObjectsClient = savedObjectsClientMock.create();

View file

@ -5,7 +5,7 @@
*/
import { AlertsClient, ConstructorOptions } from '../alerts_client';
import { savedObjectsClientMock, loggingSystemMock } from '../../../../../../src/core/server/mocks';
import { taskManagerMock } from '../../../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { alertTypeRegistryMock } from '../../alert_type_registry.mock';
import { alertsAuthorizationMock } from '../../authorization/alerts_authorization.mock';
import { encryptedSavedObjectsMock } from '../../../../encrypted_saved_objects/server/mocks';
@ -14,7 +14,7 @@ import { AlertsAuthorization } from '../../authorization/alerts_authorization';
import { ActionsAuthorization } from '../../../../actions/server';
import { getBeforeSetup, setGlobalDate } from './lib';
const taskManager = taskManagerMock.start();
const taskManager = taskManagerMock.createStart();
const alertTypeRegistry = alertTypeRegistryMock.create();
const unsecuredSavedObjectsClient = savedObjectsClientMock.create();

View file

@ -5,7 +5,7 @@
*/
import { AlertsClient, ConstructorOptions } from '../alerts_client';
import { savedObjectsClientMock, loggingSystemMock } from '../../../../../../src/core/server/mocks';
import { taskManagerMock } from '../../../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { alertTypeRegistryMock } from '../../alert_type_registry.mock';
import { alertsAuthorizationMock } from '../../authorization/alerts_authorization.mock';
import { encryptedSavedObjectsMock } from '../../../../encrypted_saved_objects/server/mocks';
@ -19,7 +19,7 @@ import { EventsFactory } from '../../lib/alert_instance_summary_from_event_log.t
import { RawAlert } from '../../types';
import { getBeforeSetup, mockedDateString, setGlobalDate } from './lib';
const taskManager = taskManagerMock.start();
const taskManager = taskManagerMock.createStart();
const alertTypeRegistry = alertTypeRegistryMock.create();
const unsecuredSavedObjectsClient = savedObjectsClientMock.create();
const eventLogClient = eventLogClientMock.create();

View file

@ -5,7 +5,7 @@
*/
import { AlertsClient, ConstructorOptions } from '../alerts_client';
import { savedObjectsClientMock, loggingSystemMock } from '../../../../../../src/core/server/mocks';
import { taskManagerMock } from '../../../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { alertTypeRegistryMock } from '../../alert_type_registry.mock';
import { alertsAuthorizationMock } from '../../authorization/alerts_authorization.mock';
import { TaskStatus } from '../../../../task_manager/server';
@ -15,7 +15,7 @@ import { AlertsAuthorization } from '../../authorization/alerts_authorization';
import { ActionsAuthorization } from '../../../../actions/server';
import { getBeforeSetup } from './lib';
const taskManager = taskManagerMock.start();
const taskManager = taskManagerMock.createStart();
const alertTypeRegistry = alertTypeRegistryMock.create();
const unsecuredSavedObjectsClient = savedObjectsClientMock.create();

View file

@ -3,8 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { TaskManager } from '../../../../task_manager/server/task_manager';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { IEventLogClient } from '../../../../event_log/server';
import { actionsClientMock } from '../../../../actions/server/mocks';
import { ConstructorOptions } from '../alerts_client';
@ -41,9 +40,7 @@ export function setGlobalDate() {
export function getBeforeSetup(
alertsClientParams: jest.Mocked<ConstructorOptions>,
taskManager: jest.Mocked<
Pick<TaskManager, 'fetch' | 'get' | 'remove' | 'schedule' | 'runNow' | 'ensureScheduled'>
>,
taskManager: ReturnType<typeof taskManagerMock.createStart>,
alertTypeRegistry: jest.Mocked<Pick<AlertTypeRegistry, 'get' | 'has' | 'register' | 'list'>>,
eventLogClient?: jest.Mocked<IEventLogClient>
) {

View file

@ -5,7 +5,7 @@
*/
import { AlertsClient, ConstructorOptions } from '../alerts_client';
import { savedObjectsClientMock, loggingSystemMock } from '../../../../../../src/core/server/mocks';
import { taskManagerMock } from '../../../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { alertTypeRegistryMock } from '../../alert_type_registry.mock';
import { alertsAuthorizationMock } from '../../authorization/alerts_authorization.mock';
import { encryptedSavedObjectsMock } from '../../../../encrypted_saved_objects/server/mocks';
@ -14,7 +14,7 @@ import { AlertsAuthorization } from '../../authorization/alerts_authorization';
import { ActionsAuthorization } from '../../../../actions/server';
import { getBeforeSetup } from './lib';
const taskManager = taskManagerMock.start();
const taskManager = taskManagerMock.createStart();
const alertTypeRegistry = alertTypeRegistryMock.create();
const unsecuredSavedObjectsClient = savedObjectsClientMock.create();

View file

@ -5,7 +5,7 @@
*/
import { AlertsClient, ConstructorOptions } from '../alerts_client';
import { savedObjectsClientMock, loggingSystemMock } from '../../../../../../src/core/server/mocks';
import { taskManagerMock } from '../../../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { alertTypeRegistryMock } from '../../alert_type_registry.mock';
import { alertsAuthorizationMock } from '../../authorization/alerts_authorization.mock';
import { encryptedSavedObjectsMock } from '../../../../encrypted_saved_objects/server/mocks';
@ -14,7 +14,7 @@ import { AlertsAuthorization } from '../../authorization/alerts_authorization';
import { ActionsAuthorization } from '../../../../actions/server';
import { getBeforeSetup } from './lib';
const taskManager = taskManagerMock.start();
const taskManager = taskManagerMock.createStart();
const alertTypeRegistry = alertTypeRegistryMock.create();
const unsecuredSavedObjectsClient = savedObjectsClientMock.create();
const encryptedSavedObjects = encryptedSavedObjectsMock.createClient();

View file

@ -5,7 +5,7 @@
*/
import { AlertsClient, ConstructorOptions } from '../alerts_client';
import { savedObjectsClientMock, loggingSystemMock } from '../../../../../../src/core/server/mocks';
import { taskManagerMock } from '../../../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { alertTypeRegistryMock } from '../../alert_type_registry.mock';
import { alertsAuthorizationMock } from '../../authorization/alerts_authorization.mock';
import { encryptedSavedObjectsMock } from '../../../../encrypted_saved_objects/server/mocks';
@ -14,7 +14,7 @@ import { AlertsAuthorization } from '../../authorization/alerts_authorization';
import { ActionsAuthorization } from '../../../../actions/server';
import { getBeforeSetup } from './lib';
const taskManager = taskManagerMock.start();
const taskManager = taskManagerMock.createStart();
const alertTypeRegistry = alertTypeRegistryMock.create();
const unsecuredSavedObjectsClient = savedObjectsClientMock.create();

View file

@ -5,7 +5,7 @@
*/
import { AlertsClient, ConstructorOptions } from '../alerts_client';
import { savedObjectsClientMock, loggingSystemMock } from '../../../../../../src/core/server/mocks';
import { taskManagerMock } from '../../../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { alertTypeRegistryMock } from '../../alert_type_registry.mock';
import { alertsAuthorizationMock } from '../../authorization/alerts_authorization.mock';
import { encryptedSavedObjectsMock } from '../../../../encrypted_saved_objects/server/mocks';
@ -14,7 +14,7 @@ import { AlertsAuthorization } from '../../authorization/alerts_authorization';
import { ActionsAuthorization } from '../../../../actions/server';
import { getBeforeSetup } from './lib';
const taskManager = taskManagerMock.start();
const taskManager = taskManagerMock.createStart();
const alertTypeRegistry = alertTypeRegistryMock.create();
const unsecuredSavedObjectsClient = savedObjectsClientMock.create();

View file

@ -5,7 +5,7 @@
*/
import { AlertsClient, ConstructorOptions } from '../alerts_client';
import { savedObjectsClientMock, loggingSystemMock } from '../../../../../../src/core/server/mocks';
import { taskManagerMock } from '../../../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { alertTypeRegistryMock } from '../../alert_type_registry.mock';
import { alertsAuthorizationMock } from '../../authorization/alerts_authorization.mock';
import { encryptedSavedObjectsMock } from '../../../../encrypted_saved_objects/server/mocks';
@ -14,7 +14,7 @@ import { AlertsAuthorization } from '../../authorization/alerts_authorization';
import { ActionsAuthorization } from '../../../../actions/server';
import { getBeforeSetup } from './lib';
const taskManager = taskManagerMock.start();
const taskManager = taskManagerMock.createStart();
const alertTypeRegistry = alertTypeRegistryMock.create();
const unsecuredSavedObjectsClient = savedObjectsClientMock.create();

View file

@ -7,7 +7,7 @@ import uuid from 'uuid';
import { schema } from '@kbn/config-schema';
import { AlertsClient, ConstructorOptions } from '../alerts_client';
import { savedObjectsClientMock, loggingSystemMock } from '../../../../../../src/core/server/mocks';
import { taskManagerMock } from '../../../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { alertTypeRegistryMock } from '../../alert_type_registry.mock';
import { alertsAuthorizationMock } from '../../authorization/alerts_authorization.mock';
import { IntervalSchedule } from '../../types';
@ -19,7 +19,7 @@ import { ActionsAuthorization, ActionsClient } from '../../../../actions/server'
import { TaskStatus } from '../../../../task_manager/server';
import { getBeforeSetup, setGlobalDate } from './lib';
const taskManager = taskManagerMock.start();
const taskManager = taskManagerMock.createStart();
const alertTypeRegistry = alertTypeRegistryMock.create();
const unsecuredSavedObjectsClient = savedObjectsClientMock.create();

View file

@ -5,7 +5,7 @@
*/
import { AlertsClient, ConstructorOptions } from '../alerts_client';
import { savedObjectsClientMock, loggingSystemMock } from '../../../../../../src/core/server/mocks';
import { taskManagerMock } from '../../../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { alertTypeRegistryMock } from '../../alert_type_registry.mock';
import { alertsAuthorizationMock } from '../../authorization/alerts_authorization.mock';
import { encryptedSavedObjectsMock } from '../../../../encrypted_saved_objects/server/mocks';
@ -14,7 +14,7 @@ import { AlertsAuthorization } from '../../authorization/alerts_authorization';
import { ActionsAuthorization } from '../../../../actions/server';
import { getBeforeSetup } from './lib';
const taskManager = taskManagerMock.start();
const taskManager = taskManagerMock.createStart();
const alertTypeRegistry = alertTypeRegistryMock.create();
const unsecuredSavedObjectsClient = savedObjectsClientMock.create();
const encryptedSavedObjects = encryptedSavedObjectsMock.createClient();

View file

@ -8,7 +8,7 @@ import { cloneDeep } from 'lodash';
import { AlertsClient, ConstructorOptions } from './alerts_client';
import { savedObjectsClientMock, loggingSystemMock } from '../../../../src/core/server/mocks';
import { taskManagerMock } from '../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../task_manager/server/mocks';
import { alertTypeRegistryMock } from './alert_type_registry.mock';
import { alertsAuthorizationMock } from './authorization/alerts_authorization.mock';
import { encryptedSavedObjectsMock } from '../../encrypted_saved_objects/server/mocks';
@ -25,7 +25,7 @@ const MockAlertId = 'alert-id';
const ConflictAfterRetries = RetryForConflictsAttempts + 1;
const taskManager = taskManagerMock.start();
const taskManager = taskManagerMock.createStart();
const alertTypeRegistry = alertTypeRegistryMock.create();
const unsecuredSavedObjectsClient = savedObjectsClientMock.create();

View file

@ -7,7 +7,7 @@
import { Request } from 'hapi';
import { AlertsClientFactory, AlertsClientFactoryOpts } from './alerts_client_factory';
import { alertTypeRegistryMock } from './alert_type_registry.mock';
import { taskManagerMock } from '../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../task_manager/server/mocks';
import { KibanaRequest } from '../../../../src/core/server';
import {
savedObjectsClientMock,
@ -35,7 +35,7 @@ const features = featuresPluginMock.createStart();
const securityPluginSetup = securityMock.createSetup();
const alertsClientFactoryParams: jest.Mocked<AlertsClientFactoryOpts> = {
logger: loggingSystemMock.create().get(),
taskManager: taskManagerMock.start(),
taskManager: taskManagerMock.createStart(),
alertTypeRegistry: alertTypeRegistryMock.create(),
getSpaceId: jest.fn(),
getSpace: jest.fn(),

View file

@ -6,8 +6,8 @@
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import { registerAlertsUsageCollector } from './alerts_usage_collector';
import { taskManagerMock } from '../../../task_manager/server/task_manager.mock';
const taskManagerStart = taskManagerMock.start();
import { taskManagerMock } from '../../../task_manager/server/mocks';
const taskManagerStart = taskManagerMock.createStart();
beforeEach(() => jest.resetAllMocks());

View file

@ -42,7 +42,6 @@ function registerAlertingTelemetryTask(
taskManager.registerTaskDefinitions({
[TELEMETRY_TASK_TYPE]: {
title: 'Alerting usage fetch task',
type: TELEMETRY_TASK_TYPE,
timeout: '5m',
createTaskRunner: telemetryTaskRunner(logger, core, kibanaIndex),
},

View file

@ -49,7 +49,6 @@ export async function createApmTelemetry({
taskManager.registerTaskDefinitions({
[APM_TELEMETRY_TASK_NAME]: {
title: 'Collect APM usage',
type: APM_TELEMETRY_TASK_NAME,
createTaskRunner: () => {
return {
run: async () => {

View file

@ -48,7 +48,6 @@ function registerLensTelemetryTask(
taskManager.registerTaskDefinitions({
[TELEMETRY_TASK_TYPE]: {
title: 'Lens usage fetch task',
type: TELEMETRY_TASK_TYPE,
timeout: '1m',
createTaskRunner: telemetryTaskRunner(logger, core, config),
},

View file

@ -50,7 +50,6 @@ describe('SessionManagementService', () => {
expect(mockTaskManager.registerTaskDefinitions).toHaveBeenCalledWith({
[SESSION_INDEX_CLEANUP_TASK_NAME]: {
title: 'Cleanup expired or invalid user sessions',
type: SESSION_INDEX_CLEANUP_TASK_NAME,
createTaskRunner: expect.any(Function),
},
});

View file

@ -78,7 +78,6 @@ export class SessionManagementService {
taskManager.registerTaskDefinitions({
[SESSION_INDEX_CLEANUP_TASK_NAME]: {
title: 'Cleanup expired or invalid user sessions',
type: SESSION_INDEX_CLEANUP_TASK_NAME,
createTaskRunner: () => ({ run: () => this.sessionIndex.cleanUp() }),
},
});

View file

@ -39,7 +39,6 @@ export class ManifestTask {
setupContract.taskManager.registerTaskDefinitions({
[ManifestTaskConstants.TYPE]: {
title: 'Security Solution Endpoint Exceptions Handler',
type: ManifestTaskConstants.TYPE,
timeout: ManifestTaskConstants.TIMEOUT,
createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => {
return {

View file

@ -5,53 +5,47 @@
*/
import sinon from 'sinon';
import { mockLogger } from '../test_utils';
import { TaskManager } from '../task_manager';
import { savedObjectsRepositoryMock } from '../../../../../src/core/server/mocks';
import {
SavedObjectsSerializer,
SavedObjectTypeRegistry,
SavedObjectsErrorHelpers,
} from '../../../../../src/core/server';
import { SavedObjectsErrorHelpers, Logger } from '../../../../../src/core/server';
import { ADJUST_THROUGHPUT_INTERVAL } from '../lib/create_managed_configuration';
import { TaskManagerPlugin, TaskManagerStartContract } from '../plugin';
import { coreMock } from '../../../../../src/core/server/mocks';
import { TaskManagerConfig } from '../config';
describe('managed configuration', () => {
let taskManager: TaskManager;
let clock: sinon.SinonFakeTimers;
const callAsInternalUser = jest.fn();
const logger = mockLogger();
const serializer = new SavedObjectsSerializer(new SavedObjectTypeRegistry());
const savedObjectsClient = savedObjectsRepositoryMock.create();
const config = {
enabled: true,
max_workers: 10,
index: 'foo',
max_attempts: 9,
poll_interval: 3000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
};
let taskManagerStart: TaskManagerStartContract;
let logger: Logger;
beforeEach(() => {
let clock: sinon.SinonFakeTimers;
const savedObjectsClient = savedObjectsRepositoryMock.create();
beforeEach(async () => {
jest.resetAllMocks();
callAsInternalUser.mockResolvedValue({ total: 0, updated: 0, version_conflicts: 0 });
clock = sinon.useFakeTimers();
taskManager = new TaskManager({
config,
logger,
serializer,
callAsInternalUser,
taskManagerId: 'some-uuid',
savedObjectsRepository: savedObjectsClient,
const context = coreMock.createPluginInitializerContext<TaskManagerConfig>({
enabled: true,
max_workers: 10,
index: 'foo',
max_attempts: 9,
poll_interval: 3000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
});
taskManager.registerTaskDefinitions({
logger = context.logger.get('taskManager');
const taskManager = new TaskManagerPlugin(context);
(await taskManager.setup(coreMock.createSetup())).registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
taskManager.start();
const coreStart = coreMock.createStart();
coreStart.savedObjects.createInternalRepository.mockReturnValue(savedObjectsClient);
taskManagerStart = await taskManager.start(coreStart);
// force rxjs timers to fire when they are scheduled for setTimeout(0) as the
// sinon fake timers cause them to stall
clock.tick(0);
@ -63,15 +57,17 @@ describe('managed configuration', () => {
savedObjectsClient.create.mockRejectedValueOnce(
SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')
);
// Cause "too many requests" error to be thrown
await expect(
taskManager.schedule({
taskManagerStart.schedule({
taskType: 'foo',
state: {},
params: {},
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`);
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Max workers configuration is temporarily reduced after Elasticsearch returned 1 "too many request" error(s).'
);
@ -85,15 +81,17 @@ describe('managed configuration', () => {
savedObjectsClient.create.mockRejectedValueOnce(
SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')
);
// Cause "too many requests" error to be thrown
await expect(
taskManager.schedule({
taskManagerStart.schedule({
taskType: 'foo',
state: {},
params: {},
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`);
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" error(s).'
);

View file

@ -4,9 +4,10 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { mockLogger } from '../test_utils';
import { createBuffer, Entity, OperationError, BulkOperation } from './bulk_operation_buffer';
import { mapErr, asOk, asErr, Ok, Err } from './result_type';
import { mockLogger } from '../test_utils';
interface TaskInstance extends Entity {
attempts: number;

View file

@ -8,7 +8,7 @@ import { map } from 'lodash';
import { Subject, race, from } from 'rxjs';
import { bufferWhen, filter, bufferCount, flatMap, mapTo, first } from 'rxjs/operators';
import { either, Result, asOk, asErr, Ok, Err } from './result_type';
import { Logger } from '../types';
import { Logger } from '../../../../../src/core/server';
export interface BufferOptions {
bufferMaxDuration?: number;

View file

@ -4,8 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { ensureDeprecatedFieldsAreCorrected } from './correct_deprecated_fields';
import { mockLogger } from '../test_utils';
import { ensureDeprecatedFieldsAreCorrected } from './correct_deprecated_fields';
describe('ensureDeprecatedFieldsAreCorrected', () => {
test('doesnt change tasks without any schedule fields', async () => {

View file

@ -5,7 +5,7 @@
*/
import { TaskInstance, TaskInstanceWithDeprecatedFields } from '../task';
import { Logger } from '../types';
import { Logger } from '../../../../../src/core/server';
export function ensureDeprecatedFieldsAreCorrected(
{ id, taskType, interval, schedule, ...taskInstance }: TaskInstanceWithDeprecatedFields,

View file

@ -6,12 +6,12 @@
import sinon from 'sinon';
import { Subject } from 'rxjs';
import { mockLogger } from '../test_utils';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
import {
createManagedConfiguration,
ADJUST_THROUGHPUT_INTERVAL,
} from './create_managed_configuration';
import { mockLogger } from '../test_utils';
describe('createManagedConfiguration()', () => {
let clock: sinon.SinonFakeTimers;

View file

@ -7,7 +7,7 @@
import { interval, merge, of, Observable } from 'rxjs';
import { filter, mergeScan, map, scan, distinctUntilChanged, startWith } from 'rxjs/operators';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
import { Logger } from '../types';
import { Logger } from '../../../../../src/core/server';
const FLUSH_MARKER = Symbol('flush');
export const ADJUST_THROUGHPUT_INTERVAL = 10 * 1000;
@ -31,7 +31,7 @@ interface ManagedConfigurationOpts {
errors$: Observable<Error>;
}
interface ManagedConfiguration {
export interface ManagedConfiguration {
maxWorkersConfiguration$: Observable<number>;
pollIntervalConfiguration$: Observable<number>;
}

View file

@ -6,49 +6,37 @@
import { RunContext, TaskInstance } from '../task';
/*
* BeforeSaveMiddlewareParams is nearly identical to RunContext, but
* taskInstance is before save (no _id property)
*
* taskInstance property is guaranteed to exist. The params can optionally
* include fields from an "options" object passed as the 2nd parameter to
* taskManager.schedule()
*/
export interface BeforeSaveMiddlewareParams {
type Mapper<T> = (params: T) => Promise<T>;
interface BeforeSaveContext {
taskInstance: TaskInstance;
}
export type BeforeSaveFunction = (
params: BeforeSaveMiddlewareParams
) => Promise<BeforeSaveMiddlewareParams>;
export type BeforeRunFunction = (params: RunContext) => Promise<RunContext>;
export type BeforeMarkRunningFunction = (params: RunContext) => Promise<RunContext>;
export type BeforeSaveContextFunction = Mapper<BeforeSaveContext>;
export type BeforeRunContextFunction = Mapper<RunContext>;
export interface Middleware {
beforeSave: BeforeSaveFunction;
beforeRun: BeforeRunFunction;
beforeMarkRunning: BeforeMarkRunningFunction;
beforeSave: BeforeSaveContextFunction;
beforeRun: BeforeRunContextFunction;
beforeMarkRunning: BeforeRunContextFunction;
}
export function addMiddlewareToChain(prevMiddleware: Middleware, middleware: Middleware) {
const beforeSave = middleware.beforeSave
? (params: BeforeSaveMiddlewareParams) =>
middleware.beforeSave(params).then(prevMiddleware.beforeSave)
: prevMiddleware.beforeSave;
const beforeRun = middleware.beforeRun
? (params: RunContext) => middleware.beforeRun(params).then(prevMiddleware.beforeRun)
: prevMiddleware.beforeRun;
const beforeMarkRunning = middleware.beforeMarkRunning
? (params: RunContext) =>
middleware.beforeMarkRunning(params).then(prevMiddleware.beforeMarkRunning)
: prevMiddleware.beforeMarkRunning;
export function addMiddlewareToChain(prev: Middleware, next: Partial<Middleware>) {
return {
beforeSave,
beforeRun,
beforeMarkRunning,
beforeSave: next.beforeSave ? chain(prev.beforeSave, next.beforeSave) : prev.beforeSave,
beforeRun: next.beforeRun ? chain(prev.beforeRun, next.beforeRun) : prev.beforeRun,
beforeMarkRunning: next.beforeMarkRunning
? chain(prev.beforeMarkRunning, next.beforeMarkRunning)
: prev.beforeMarkRunning,
};
}
const chain = <T>(prev: Mapper<T>, next: Mapper<T>): Mapper<T> => (params) =>
next(params).then(prev);
export function createInitialMiddleware(): Middleware {
return {
beforeSave: async (saveOpts: BeforeSaveContext) => saveOpts,
beforeRun: async (runOpts: RunContext) => runOpts,
beforeMarkRunning: async (runOpts: RunContext) => runOpts,
};
}

View file

@ -1,25 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import Joi from 'joi';
import { TaskDefinition, TaskDictionary, validateTaskDefinition } from '../task';
/**
* Sanitizes the system's task definitions. Task definitions have optional properties, and
* this ensures they all are given a reasonable default.
*
* @param taskDefinitions - The Kibana task definitions dictionary
*/
export function sanitizeTaskDefinitions(
taskDefinitions: TaskDictionary<TaskDefinition> = {}
): TaskDictionary<TaskDefinition> {
return Object.keys(taskDefinitions).reduce((acc, type) => {
const rawDefinition = taskDefinitions[type];
rawDefinition.type = type;
acc[type] = Joi.attempt(rawDefinition, validateTaskDefinition) as TaskDefinition;
return acc;
}, {} as TaskDictionary<TaskDefinition>);
}

View file

@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { TaskManagerPlugin } from './plugin';
import { coreMock } from '../../../../src/core/server/mocks';
import { TaskManagerConfig } from './config';
describe('TaskManagerPlugin', () => {
describe('setup', () => {
test('throws if no valid UUID is available', async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext<TaskManagerConfig>({
enabled: true,
max_workers: 10,
index: 'foo',
max_attempts: 9,
poll_interval: 3000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
});
pluginInitializerContext.env.instanceUuid = '';
const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext);
expect(taskManagerPlugin.setup(coreMock.createSetup())).rejects.toEqual(
new Error(`TaskManager is unable to start as Kibana has no valid UUID assigned to it.`)
);
});
test('throws if setup methods are called after start', async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext<TaskManagerConfig>({
enabled: true,
max_workers: 10,
index: 'foo',
max_attempts: 9,
poll_interval: 3000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
});
const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext);
const setupApi = await taskManagerPlugin.setup(coreMock.createSetup());
await taskManagerPlugin.start(coreMock.createStart());
expect(() =>
setupApi.addMiddleware({
beforeSave: async (saveOpts) => saveOpts,
beforeRun: async (runOpts) => runOpts,
beforeMarkRunning: async (runOpts) => runOpts,
})
).toThrowErrorMatchingInlineSnapshot(
`"Cannot add Middleware after the task manager has started"`
);
expect(() =>
setupApi.registerTaskDefinitions({
lateRegisteredType: {
title: 'lateRegisteredType',
createTaskRunner: () => ({ async run() {} }),
},
})
).toThrowErrorMatchingInlineSnapshot(
`"Cannot register task definitions after the task manager has started"`
);
});
});
});

View file

@ -3,92 +3,140 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { PluginInitializerContext, Plugin, CoreSetup, CoreStart } from 'src/core/server';
import { Subject } from 'rxjs';
import { PluginInitializerContext, Plugin, CoreSetup, Logger, CoreStart } from 'src/core/server';
import { first } from 'rxjs/operators';
import { TaskDictionary, TaskDefinition } from './task';
import { TaskManager } from './task_manager';
import { TaskDefinition } from './task';
import { TaskPollingLifecycle } from './polling_lifecycle';
import { TaskManagerConfig } from './config';
import { Middleware } from './lib/middleware';
import { createInitialMiddleware, addMiddlewareToChain, Middleware } from './lib/middleware';
import { setupSavedObjects } from './saved_objects';
import { TaskTypeDictionary } from './task_type_dictionary';
import { FetchResult, SearchOpts, TaskStore } from './task_store';
import { createManagedConfiguration } from './lib/create_managed_configuration';
import { TaskScheduling } from './task_scheduling';
export type TaskManagerSetupContract = Pick<
TaskManager,
'addMiddleware' | 'registerTaskDefinitions'
export type TaskManagerSetupContract = { addMiddleware: (middleware: Middleware) => void } & Pick<
TaskTypeDictionary,
'registerTaskDefinitions'
>;
export type TaskManagerStartContract = Pick<
TaskManager,
'fetch' | 'get' | 'remove' | 'schedule' | 'runNow' | 'ensureScheduled'
>;
TaskScheduling,
'schedule' | 'runNow' | 'ensureScheduled'
> &
Pick<TaskStore, 'fetch' | 'get' | 'remove'>;
export class TaskManagerPlugin
implements Plugin<TaskManagerSetupContract, TaskManagerStartContract> {
legacyTaskManager$: Subject<TaskManager> = new Subject<TaskManager>();
taskManager: Promise<TaskManager> = this.legacyTaskManager$.pipe(first()).toPromise();
currentConfig: TaskManagerConfig;
taskManagerId?: string;
config?: TaskManagerConfig;
private taskPollingLifecycle?: TaskPollingLifecycle;
private taskManagerId?: string;
private config?: TaskManagerConfig;
private logger: Logger;
private definitions: TaskTypeDictionary;
private middleware: Middleware = createInitialMiddleware();
constructor(private readonly initContext: PluginInitializerContext) {
this.initContext = initContext;
this.currentConfig = {} as TaskManagerConfig;
this.logger = initContext.logger.get();
this.definitions = new TaskTypeDictionary(this.logger);
}
public async setup(core: CoreSetup): Promise<TaskManagerSetupContract> {
public async setup({ savedObjects }: CoreSetup): Promise<TaskManagerSetupContract> {
this.config = await this.initContext.config
.create<TaskManagerConfig>()
.pipe(first())
.toPromise();
setupSavedObjects(core.savedObjects, this.config);
setupSavedObjects(savedObjects, this.config);
this.taskManagerId = this.initContext.env.instanceUuid;
if (!this.taskManagerId) {
this.logger.error(
`TaskManager is unable to start as there the Kibana UUID is invalid (value of the "server.uuid" configuration is ${this.taskManagerId})`
);
throw new Error(`TaskManager is unable to start as Kibana has no valid UUID assigned to it.`);
} else {
this.logger.info(`TaskManager is identified by the Kibana UUID: ${this.taskManagerId}`);
}
return {
addMiddleware: (middleware: Middleware) => {
this.taskManager.then((tm) => tm.addMiddleware(middleware));
this.assertStillInSetup('add Middleware');
this.middleware = addMiddlewareToChain(this.middleware, middleware);
},
registerTaskDefinitions: (taskDefinition: TaskDictionary<TaskDefinition>) => {
this.taskManager.then((tm) => tm.registerTaskDefinitions(taskDefinition));
registerTaskDefinitions: (taskDefinition: Record<string, TaskDefinition>) => {
this.assertStillInSetup('register task definitions');
this.definitions.registerTaskDefinitions(taskDefinition);
},
};
}
public start({ savedObjects, elasticsearch }: CoreStart): TaskManagerStartContract {
const logger = this.initContext.logger.get('taskManager');
const savedObjectsRepository = savedObjects.createInternalRepository(['task']);
this.legacyTaskManager$.next(
new TaskManager({
taskManagerId: this.taskManagerId!,
config: this.config!,
savedObjectsRepository,
serializer: savedObjects.createSerializer(),
callAsInternalUser: elasticsearch.legacy.client.callAsInternalUser,
logger,
})
);
this.legacyTaskManager$.complete();
const taskStore = new TaskStore({
serializer: savedObjects.createSerializer(),
savedObjectsRepository,
esClient: elasticsearch.createClient('taskManager').asInternalUser,
index: this.config!.index,
maxAttempts: this.config!.max_attempts,
definitions: this.definitions,
taskManagerId: `kibana:${this.taskManagerId!}`,
});
// we need to "drain" any calls made to the seup API
// before `starting` TaskManager. This is a legacy relic
// of the old API that should be resolved once we split
// Task manager into two services, setup and start, instead
// of the single instance of TaskManager
this.taskManager.then((tm) => tm.start());
const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({
logger: this.logger,
errors$: taskStore.errors$,
startingMaxWorkers: this.config!.max_workers,
startingPollInterval: this.config!.poll_interval,
});
const taskPollingLifecycle = new TaskPollingLifecycle({
config: this.config!,
definitions: this.definitions,
logger: this.logger,
taskStore,
middleware: this.middleware,
maxWorkersConfiguration$,
pollIntervalConfiguration$,
});
this.taskPollingLifecycle = taskPollingLifecycle;
const taskScheduling = new TaskScheduling({
logger: this.logger,
taskStore,
middleware: this.middleware,
taskPollingLifecycle,
});
// start polling for work
taskPollingLifecycle.start();
return {
fetch: (...args) => this.taskManager.then((tm) => tm.fetch(...args)),
get: (...args) => this.taskManager.then((tm) => tm.get(...args)),
remove: (...args) => this.taskManager.then((tm) => tm.remove(...args)),
schedule: (...args) => this.taskManager.then((tm) => tm.schedule(...args)),
runNow: (...args) => this.taskManager.then((tm) => tm.runNow(...args)),
ensureScheduled: (...args) => this.taskManager.then((tm) => tm.ensureScheduled(...args)),
fetch: (opts: SearchOpts): Promise<FetchResult> => taskStore.fetch(opts),
get: (id: string) => taskStore.get(id),
remove: (id: string) => taskStore.remove(id),
schedule: (...args) => taskScheduling.schedule(...args),
ensureScheduled: (...args) => taskScheduling.ensureScheduled(...args),
runNow: (...args) => taskScheduling.runNow(...args),
};
}
public stop() {
this.taskManager.then((tm) => {
tm.stop();
});
if (this.taskPollingLifecycle) {
this.taskPollingLifecycle.stop();
}
}
/**
* Ensures task manager hasn't started
*
* @param {string} the name of the operation being executed
* @returns void
*/
private assertStillInSetup(operation: string) {
if (this.taskPollingLifecycle?.isStarted) {
throw new Error(`Cannot ${operation} after the task manager has started`);
}
}
}

View file

@ -15,7 +15,7 @@ import { mapTo, filter, scan, concatMap, tap, catchError, switchMap } from 'rxjs
import { pipe } from 'fp-ts/lib/pipeable';
import { Option, none, map as mapOptional, getOrElse } from 'fp-ts/lib/Option';
import { Logger } from '../types';
import { Logger } from '../../../../../src/core/server';
import { pullFromSet } from '../lib/pull_from_set';
import {
Result,

View file

@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { TaskPollingLifecycle, TaskLifecycleEvent } from './polling_lifecycle';
import { of, Observable } from 'rxjs';
export const taskPollingLifecycleMock = {
create(opts: { isStarted?: boolean; events$?: Observable<TaskLifecycleEvent> }) {
return ({
start: jest.fn(),
attemptToRun: jest.fn(),
get isStarted() {
return opts.isStarted ?? true;
},
get events() {
return opts.events$ ?? of();
},
stop: jest.fn(),
} as unknown) as jest.Mocked<TaskPollingLifecycle>;
},
};

View file

@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import _ from 'lodash';
import sinon from 'sinon';
import { of } from 'rxjs';
import { TaskPollingLifecycle, claimAvailableTasks } from './polling_lifecycle';
import { createInitialMiddleware } from './lib/middleware';
import { TaskTypeDictionary } from './task_type_dictionary';
import { taskStoreMock } from './task_store.mock';
import { mockLogger } from './test_utils';
describe('TaskPollingLifecycle', () => {
let clock: sinon.SinonFakeTimers;
const taskManagerLogger = mockLogger();
const mockTaskStore = taskStoreMock.create({});
const taskManagerOpts = {
config: {
enabled: true,
max_workers: 10,
index: 'foo',
max_attempts: 9,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
},
taskStore: mockTaskStore,
logger: taskManagerLogger,
definitions: new TaskTypeDictionary(taskManagerLogger),
middleware: createInitialMiddleware(),
maxWorkersConfiguration$: of(100),
pollIntervalConfiguration$: of(100),
};
beforeEach(() => {
clock = sinon.useFakeTimers();
taskManagerOpts.definitions = new TaskTypeDictionary(taskManagerLogger);
});
afterEach(() => clock.restore());
describe('start', () => {
test('begins polling once start is called', () => {
const taskManager = new TaskPollingLifecycle(taskManagerOpts);
clock.tick(150);
expect(mockTaskStore.claimAvailableTasks).not.toHaveBeenCalled();
taskManager.start();
clock.tick(150);
expect(mockTaskStore.claimAvailableTasks).toHaveBeenCalled();
});
});
describe('claimAvailableTasks', () => {
test('should claim Available Tasks when there are available workers', () => {
const logger = mockLogger();
const claim = jest.fn(() => Promise.resolve({ docs: [], claimedTasks: 0 }));
const availableWorkers = 1;
claimAvailableTasks([], claim, availableWorkers, logger);
expect(claim).toHaveBeenCalledTimes(1);
});
test('should not claim Available Tasks when there are no available workers', () => {
const logger = mockLogger();
const claim = jest.fn(() => Promise.resolve({ docs: [], claimedTasks: 0 }));
const availableWorkers = 0;
claimAvailableTasks([], claim, availableWorkers, logger);
expect(claim).not.toHaveBeenCalled();
});
/**
* This handles the case in which Elasticsearch has had inline script disabled.
* This is achieved by setting the `script.allowed_types` flag on Elasticsearch to `none`
*/
test('handles failure due to inline scripts being disabled', () => {
const logger = mockLogger();
const claim = jest.fn(() => {
throw Object.assign(new Error(), {
response:
'{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"cannot execute [inline] scripts"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":true,"failed_shards":[{"shard":0,"index":".kibana_task_manager_1","node":"24A4QbjHSK6prvtopAKLKw","reason":{"type":"illegal_argument_exception","reason":"cannot execute [inline] scripts"}}],"caused_by":{"type":"illegal_argument_exception","reason":"cannot execute [inline] scripts","caused_by":{"type":"illegal_argument_exception","reason":"cannot execute [inline] scripts"}}},"status":400}',
});
});
claimAvailableTasks([], claim, 10, logger);
expect(logger.warn).toHaveBeenCalledTimes(1);
expect(logger.warn).toHaveBeenCalledWith(
`Task Manager cannot operate when inline scripts are disabled in Elasticsearch`
);
});
});
});

View file

@ -0,0 +1,259 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Subject, Observable, Subscription } from 'rxjs';
import { performance } from 'perf_hooks';
import { pipe } from 'fp-ts/lib/pipeable';
import { Option, some, map as mapOptional } from 'fp-ts/lib/Option';
import { Logger } from '../../../../src/core/server';
import { Result, asErr, mapErr } from './lib/result_type';
import { ManagedConfiguration } from './lib/create_managed_configuration';
import { TaskManagerConfig } from './config';
import {
TaskMarkRunning,
TaskRun,
TaskClaim,
TaskRunRequest,
asTaskRunRequestEvent,
} from './task_events';
import { fillPool, FillPoolResult } from './lib/fill_pool';
import { Middleware } from './lib/middleware';
import { intervalFromNow } from './lib/intervals';
import { ConcreteTaskInstance } from './task';
import {
createTaskPoller,
PollingError,
PollingErrorType,
createObservableMonitor,
} from './polling';
import { TaskPool } from './task_pool';
import { TaskManagerRunner, TaskRunner } from './task_runner';
import { TaskStore, OwnershipClaimingOpts, ClaimOwnershipResult } from './task_store';
import { identifyEsError } from './lib/identify_es_error';
import { BufferedTaskStore } from './buffered_task_store';
import { TaskTypeDictionary } from './task_type_dictionary';
export type TaskPollingLifecycleOpts = {
logger: Logger;
definitions: TaskTypeDictionary;
taskStore: TaskStore;
config: TaskManagerConfig;
middleware: Middleware;
} & ManagedConfiguration;
export type TaskLifecycleEvent = TaskMarkRunning | TaskRun | TaskClaim | TaskRunRequest;
/**
* The public interface into the task manager system.
*/
export class TaskPollingLifecycle {
private definitions: TaskTypeDictionary;
private store: TaskStore;
private bufferedStore: BufferedTaskStore;
private logger: Logger;
private pool: TaskPool;
// all task related events (task claimed, task marked as running, etc.) are emitted through events$
private events$ = new Subject<TaskLifecycleEvent>();
// all on-demand requests we wish to pipe into the poller
private claimRequests$ = new Subject<Option<string>>();
// the task poller that polls for work on fixed intervals and on demand
private poller$: Observable<Result<FillPoolResult, PollingError<string>>>;
// our subscription to the poller
private pollingSubscription: Subscription = Subscription.EMPTY;
private middleware: Middleware;
/**
* Initializes the task manager, preventing any further addition of middleware,
* enabling the task manipulation methods, and beginning the background polling
* mechanism.
*/
constructor(opts: TaskPollingLifecycleOpts) {
const { logger, middleware, maxWorkersConfiguration$, pollIntervalConfiguration$ } = opts;
this.logger = logger;
this.middleware = middleware;
this.definitions = opts.definitions;
this.store = opts.taskStore;
// pipe store events into the lifecycle event stream
this.store.events.subscribe((event) => this.events$.next(event));
this.bufferedStore = new BufferedTaskStore(this.store, {
bufferMaxOperations: opts.config.max_workers,
logger: this.logger,
});
this.pool = new TaskPool({
logger: this.logger,
maxWorkers$: maxWorkersConfiguration$,
});
const {
max_poll_inactivity_cycles: maxPollInactivityCycles,
poll_interval: pollInterval,
} = opts.config;
this.poller$ = createObservableMonitor<Result<FillPoolResult, PollingError<string>>, Error>(
() =>
createTaskPoller<string, FillPoolResult>({
logger: this.logger,
pollInterval$: pollIntervalConfiguration$,
bufferCapacity: opts.config.request_capacity,
getCapacity: () => this.pool.availableWorkers,
pollRequests$: this.claimRequests$,
work: this.pollForWork,
// Time out the `work` phase if it takes longer than a certain number of polling cycles
// The `work` phase includes the prework needed *before* executing a task
// (such as polling for new work, marking tasks as running etc.) but does not
// include the time of actually running the task
workTimeout: pollInterval * maxPollInactivityCycles,
}),
{
heartbeatInterval: pollInterval,
// Time out the poller itself if it has failed to complete the entire stream for a certain amount of time.
// This is different that the `work` timeout above, as the poller could enter an invalid state where
// it fails to complete a cycle even thought `work` is completing quickly.
// We grant it a single cycle longer than the time alotted to `work` so that timing out the `work`
// doesn't get short circuited by the monitor reinstantiating the poller all together (a far more expensive
// operation than just timing out the `work` internally)
inactivityTimeout: pollInterval * (maxPollInactivityCycles + 1),
onError: (error) => {
this.logger.error(`[Task Poller Monitor]: ${error.message}`);
},
}
);
}
public get events(): Observable<TaskLifecycleEvent> {
return this.events$;
}
private emitEvent = (event: TaskLifecycleEvent) => {
this.events$.next(event);
};
public attemptToRun(task: string) {
this.claimRequests$.next(some(task));
}
private createTaskRunnerForTask = (instance: ConcreteTaskInstance) => {
return new TaskManagerRunner({
logger: this.logger,
instance,
store: this.bufferedStore,
definitions: this.definitions,
beforeRun: this.middleware.beforeRun,
beforeMarkRunning: this.middleware.beforeMarkRunning,
onTaskEvent: this.emitEvent,
});
};
public get isStarted() {
return !this.pollingSubscription.closed;
}
private pollForWork = async (...tasksToClaim: string[]): Promise<FillPoolResult> => {
return fillPool(
// claim available tasks
() =>
claimAvailableTasks(
tasksToClaim.splice(0, this.pool.availableWorkers),
this.store.claimAvailableTasks,
this.pool.availableWorkers,
this.logger
),
// wrap each task in a Task Runner
this.createTaskRunnerForTask,
// place tasks in the Task Pool
async (tasks: TaskRunner[]) => await this.pool.run(tasks)
);
};
/**
* Starts up the task manager and starts picking up tasks.
*/
public start() {
if (!this.isStarted) {
this.pollingSubscription = this.poller$.subscribe(
mapErr((error: PollingError<string>) => {
if (error.type === PollingErrorType.RequestCapacityReached) {
pipe(
error.data,
mapOptional((id) => this.emitEvent(asTaskRunRequestEvent(id, asErr(error))))
);
}
this.logger.error(error.message);
})
);
}
}
/**
* Stops the task manager and cancels running tasks.
*/
public stop() {
if (this.isStarted) {
this.pollingSubscription.unsubscribe();
this.pool.cancelRunningTasks();
}
}
}
export async function claimAvailableTasks(
claimTasksById: string[],
claim: (opts: OwnershipClaimingOpts) => Promise<ClaimOwnershipResult>,
availableWorkers: number,
logger: Logger
) {
if (availableWorkers > 0) {
performance.mark('claimAvailableTasks_start');
try {
const { docs, claimedTasks } = await claim({
size: availableWorkers,
claimOwnershipUntil: intervalFromNow('30s')!,
claimTasksById,
});
if (claimedTasks === 0) {
performance.mark('claimAvailableTasks.noTasks');
}
performance.mark('claimAvailableTasks_stop');
performance.measure(
'claimAvailableTasks',
'claimAvailableTasks_start',
'claimAvailableTasks_stop'
);
if (docs.length !== claimedTasks) {
logger.warn(
`[Task Ownership error]: ${claimedTasks} tasks were claimed by Kibana, but ${
docs.length
} task(s) were fetched (${docs.map((doc) => doc.id).join(', ')})`
);
}
return docs;
} catch (ex) {
if (identifyEsError(ex).includes('cannot execute [inline] scripts')) {
logger.warn(
`Task Manager cannot operate when inline scripts are disabled in Elasticsearch`
);
} else {
throw ex;
}
}
} else {
performance.mark('claimAvailableTasks.noAvailableWorkers');
logger.debug(
`[Task Ownership]: Task Manager has skipped Claiming Ownership of available tasks at it has ran out Available Workers.`
);
}
return [];
}

View file

@ -23,23 +23,23 @@ import {
SortByRunAtAndRetryAt,
} from './mark_available_tasks_as_claimed';
import { TaskDictionary, TaskDefinition } from '../task';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { mockLogger } from '../test_utils';
describe('mark_available_tasks_as_claimed', () => {
test('generates query matching tasks to be claimed when polling for tasks', () => {
const definitions: TaskDictionary<TaskDefinition> = {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
sampleTask: {
type: 'sampleTask',
title: 'title',
maxAttempts: 5,
createTaskRunner: () => ({ run: () => Promise.resolve() }),
},
otherTask: {
type: 'otherTask',
title: 'title',
createTaskRunner: () => ({ run: () => Promise.resolve() }),
},
};
});
const defaultMaxAttempts = 1;
const taskManagerId = '3478fg6-82374f6-83467gf5-384g6f';
const claimOwnershipUntil = '2019-02-12T21:01:22.479Z';
@ -53,7 +53,7 @@ describe('mark_available_tasks_as_claimed', () => {
// Either task has an schedule or the attempts < the maximum configured
shouldBeOneOf<ExistsFilter | TermFilter | RangeFilter>(
TaskWithSchedule,
...Object.entries(definitions).map(([type, { maxAttempts }]) =>
...Array.from(definitions).map(([type, { maxAttempts }]) =>
taskWithLessThanMaxAttempts(type, maxAttempts || defaultMaxAttempts)
)
)

View file

@ -24,12 +24,6 @@ import Joi from 'joi';
*/
type Require<T extends object, P extends keyof T> = Omit<T, P> & Required<Pick<T, P>>;
/**
* A loosely typed definition of the elasticjs wrapper. It's beyond the scope
* of this work to try to make a comprehensive type definition of this.
*/
export type ElasticJs = (action: string, args: unknown) => Promise<unknown>;
/**
* The run context is passed into a task's run function as its sole argument.
*/
@ -154,13 +148,6 @@ export const validateTaskDefinition = Joi.object({
getRetry: Joi.func().optional(),
}).default();
/**
* A dictionary mapping task types to their definitions.
*/
export interface TaskDictionary<T extends TaskDefinition> {
[taskType: string]: T;
}
export enum TaskStatus {
Idle = 'idle',
Claiming = 'claiming',

View file

@ -1,30 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { TaskManagerSetupContract, TaskManagerStartContract } from './plugin';
export const taskManagerMock = {
setup(overrides: Partial<jest.Mocked<TaskManagerSetupContract>> = {}) {
const mocked: jest.Mocked<TaskManagerSetupContract> = {
registerTaskDefinitions: jest.fn(),
addMiddleware: jest.fn(),
...overrides,
};
return mocked;
},
start(overrides: Partial<jest.Mocked<TaskManagerStartContract>> = {}) {
const mocked: jest.Mocked<TaskManagerStartContract> = {
ensureScheduled: jest.fn(),
schedule: jest.fn(),
fetch: jest.fn(),
get: jest.fn(),
runNow: jest.fn(),
remove: jest.fn(),
...overrides,
};
return mocked;
},
};

View file

@ -1,499 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import _ from 'lodash';
import sinon from 'sinon';
import { Subject } from 'rxjs';
import { none } from 'fp-ts/lib/Option';
import {
asTaskMarkRunningEvent,
asTaskRunEvent,
asTaskClaimEvent,
asTaskRunRequestEvent,
} from './task_events';
import {
TaskManager,
claimAvailableTasks,
awaitTaskRunResult,
TaskLifecycleEvent,
} from './task_manager';
import { savedObjectsRepositoryMock } from '../../../../src/core/server/mocks';
import { SavedObjectsSerializer, SavedObjectTypeRegistry } from '../../../../src/core/server';
import { mockLogger } from './test_utils';
import { asErr, asOk } from './lib/result_type';
import { ConcreteTaskInstance, TaskLifecycleResult, TaskStatus } from './task';
import { Middleware } from './lib/middleware';
const savedObjectsClient = savedObjectsRepositoryMock.create();
const serializer = new SavedObjectsSerializer(new SavedObjectTypeRegistry());
describe('TaskManager', () => {
let clock: sinon.SinonFakeTimers;
const config = {
enabled: true,
max_workers: 10,
index: 'foo',
max_attempts: 9,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
};
const taskManagerOpts = {
config,
savedObjectsRepository: savedObjectsClient,
serializer,
callAsInternalUser: jest.fn(),
logger: mockLogger(),
taskManagerId: 'some-uuid',
};
beforeEach(() => {
clock = sinon.useFakeTimers();
});
afterEach(() => clock.restore());
test('throws if no valid UUID is available', async () => {
expect(() => {
new TaskManager({
...taskManagerOpts,
taskManagerId: '',
});
}).toThrowErrorMatchingInlineSnapshot(
`"TaskManager is unable to start as Kibana has no valid UUID assigned to it."`
);
});
test('allows and queues scheduling tasks before starting', async () => {
const client = new TaskManager(taskManagerOpts);
client.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
const task = {
taskType: 'foo',
params: {},
state: {},
};
savedObjectsClient.create.mockResolvedValueOnce({
id: '1',
type: 'task',
attributes: {},
references: [],
});
const promise = client.schedule(task);
client.start();
await promise;
expect(savedObjectsClient.create).toHaveBeenCalled();
});
test('allows scheduling tasks after starting', async () => {
const client = new TaskManager(taskManagerOpts);
client.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
client.start();
const task = {
taskType: 'foo',
params: {},
state: {},
};
savedObjectsClient.create.mockResolvedValueOnce({
id: '1',
type: 'task',
attributes: {},
references: [],
});
await client.schedule(task);
expect(savedObjectsClient.create).toHaveBeenCalled();
});
test('allows scheduling existing tasks that may have already been scheduled', async () => {
const client = new TaskManager(taskManagerOpts);
client.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
savedObjectsClient.create.mockRejectedValueOnce({
statusCode: 409,
});
client.start();
const result = await client.ensureScheduled({
id: 'my-foo-id',
taskType: 'foo',
params: {},
state: {},
});
expect(result.id).toEqual('my-foo-id');
});
test('doesnt ignore failure to scheduling existing tasks for reasons other than already being scheduled', async () => {
const client = new TaskManager(taskManagerOpts);
client.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
savedObjectsClient.create.mockRejectedValueOnce({
statusCode: 500,
});
client.start();
return expect(
client.ensureScheduled({
id: 'my-foo-id',
taskType: 'foo',
params: {},
state: {},
})
).rejects.toMatchObject({
statusCode: 500,
});
});
test('doesnt allow naively rescheduling existing tasks that have already been scheduled', async () => {
const client = new TaskManager(taskManagerOpts);
client.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
savedObjectsClient.create.mockRejectedValueOnce({
statusCode: 409,
});
client.start();
return expect(
client.schedule({
id: 'my-foo-id',
taskType: 'foo',
params: {},
state: {},
})
).rejects.toMatchObject({
statusCode: 409,
});
});
test('allows and queues removing tasks before starting', async () => {
const client = new TaskManager(taskManagerOpts);
savedObjectsClient.delete.mockResolvedValueOnce({});
const promise = client.remove('1');
client.start();
await promise;
expect(savedObjectsClient.delete).toHaveBeenCalled();
});
test('allows removing tasks after starting', async () => {
const client = new TaskManager(taskManagerOpts);
client.start();
savedObjectsClient.delete.mockResolvedValueOnce({});
await client.remove('1');
expect(savedObjectsClient.delete).toHaveBeenCalled();
});
test('allows and queues fetching tasks before starting', async () => {
const client = new TaskManager(taskManagerOpts);
taskManagerOpts.callAsInternalUser.mockResolvedValue({
hits: {
total: {
value: 0,
},
hits: [],
},
});
const promise = client.fetch({});
client.start();
await promise;
expect(taskManagerOpts.callAsInternalUser).toHaveBeenCalled();
});
test('allows fetching tasks after starting', async () => {
const client = new TaskManager(taskManagerOpts);
client.start();
taskManagerOpts.callAsInternalUser.mockResolvedValue({
hits: {
total: {
value: 0,
},
hits: [],
},
});
await client.fetch({});
expect(taskManagerOpts.callAsInternalUser).toHaveBeenCalled();
});
test('allows middleware registration before starting', () => {
const client = new TaskManager(taskManagerOpts);
const middleware: Middleware = {
beforeSave: jest.fn(async (saveOpts) => saveOpts),
beforeRun: jest.fn(async (runOpts) => runOpts),
beforeMarkRunning: jest.fn(async (runOpts) => runOpts),
};
expect(() => client.addMiddleware(middleware)).not.toThrow();
});
test('disallows middleware registration after starting', async () => {
const client = new TaskManager(taskManagerOpts);
const middleware: Middleware = {
beforeSave: jest.fn(async (saveOpts) => saveOpts),
beforeRun: jest.fn(async (runOpts) => runOpts),
beforeMarkRunning: jest.fn(async (runOpts) => runOpts),
};
client.start();
expect(() => client.addMiddleware(middleware)).toThrow(
/Cannot add middleware after the task manager is initialized/i
);
});
describe('runNow', () => {
describe('awaitTaskRunResult', () => {
test('resolves when the task run succeeds', () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const getLifecycle = jest.fn();
const result = awaitTaskRunResult(id, events$, getLifecycle);
const task = { id } as ConcreteTaskInstance;
events$.next(asTaskRunEvent(id, asOk(task)));
return expect(result).resolves.toEqual({ id });
});
test('rejects when the task run fails', () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const getLifecycle = jest.fn();
const result = awaitTaskRunResult(id, events$, getLifecycle);
const task = { id } as ConcreteTaskInstance;
events$.next(asTaskClaimEvent(id, asOk(task)));
events$.next(asTaskMarkRunningEvent(id, asOk(task)));
events$.next(asTaskRunEvent(id, asErr(new Error('some thing gone wrong'))));
return expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
);
});
test('rejects when the task mark as running fails', () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const getLifecycle = jest.fn();
const result = awaitTaskRunResult(id, events$, getLifecycle);
const task = { id } as ConcreteTaskInstance;
events$.next(asTaskClaimEvent(id, asOk(task)));
events$.next(asTaskMarkRunningEvent(id, asErr(new Error('some thing gone wrong'))));
return expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
);
});
test('when a task claim fails we ensure the task exists', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const getLifecycle = jest.fn(async () => TaskLifecycleResult.NotFound);
const result = awaitTaskRunResult(id, events$, getLifecycle);
events$.next(asTaskClaimEvent(id, asErr(none)));
await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it does not exist`)
);
expect(getLifecycle).toHaveBeenCalledWith(id);
});
test('when a task claim fails we ensure the task isnt already claimed', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const getLifecycle = jest.fn(async () => TaskStatus.Claiming);
const result = awaitTaskRunResult(id, events$, getLifecycle);
events$.next(asTaskClaimEvent(id, asErr(none)));
await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it is currently running`)
);
expect(getLifecycle).toHaveBeenCalledWith(id);
});
test('when a task claim fails we ensure the task isnt already running', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const getLifecycle = jest.fn(async () => TaskStatus.Running);
const result = awaitTaskRunResult(id, events$, getLifecycle);
events$.next(asTaskClaimEvent(id, asErr(none)));
await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it is currently running`)
);
expect(getLifecycle).toHaveBeenCalledWith(id);
});
test('rejects when the task run fails due to capacity', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const getLifecycle = jest.fn(async () => TaskStatus.Idle);
const result = awaitTaskRunResult(id, events$, getLifecycle);
events$.next(asTaskRunRequestEvent(id, asErr(new Error('failed to buffer request'))));
await expect(result).rejects.toEqual(
new Error(
`Failed to run task "${id}" as Task Manager is at capacity, please try again later`
)
);
expect(getLifecycle).not.toHaveBeenCalled();
});
test('when a task claim fails we return the underlying error if the task is idle', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const getLifecycle = jest.fn(async () => TaskStatus.Idle);
const result = awaitTaskRunResult(id, events$, getLifecycle);
events$.next(asTaskClaimEvent(id, asErr(none)));
await expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "idle")]`
);
expect(getLifecycle).toHaveBeenCalledWith(id);
});
test('when a task claim fails we return the underlying error if the task is failed', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const getLifecycle = jest.fn(async () => TaskStatus.Failed);
const result = awaitTaskRunResult(id, events$, getLifecycle);
events$.next(asTaskClaimEvent(id, asErr(none)));
await expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "failed")]`
);
expect(getLifecycle).toHaveBeenCalledWith(id);
});
test('ignores task run success of other tasks', () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const differentTask = '4bebf429-181b-4518-bb7d-b4246d8a35f0';
const getLifecycle = jest.fn();
const result = awaitTaskRunResult(id, events$, getLifecycle);
const task = { id } as ConcreteTaskInstance;
const otherTask = { id: differentTask } as ConcreteTaskInstance;
events$.next(asTaskClaimEvent(id, asOk(task)));
events$.next(asTaskClaimEvent(differentTask, asOk(otherTask)));
events$.next(asTaskRunEvent(differentTask, asOk(task)));
events$.next(asTaskRunEvent(id, asErr(new Error('some thing gone wrong'))));
return expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
);
});
});
});
describe('claimAvailableTasks', () => {
test('should claim Available Tasks when there are available workers', () => {
const logger = mockLogger();
const claim = jest.fn(() => Promise.resolve({ docs: [], claimedTasks: 0 }));
const availableWorkers = 1;
claimAvailableTasks([], claim, availableWorkers, logger);
expect(claim).toHaveBeenCalledTimes(1);
});
test('should not claim Available Tasks when there are no available workers', () => {
const logger = mockLogger();
const claim = jest.fn(() => Promise.resolve({ docs: [], claimedTasks: 0 }));
const availableWorkers = 0;
claimAvailableTasks([], claim, availableWorkers, logger);
expect(claim).not.toHaveBeenCalled();
});
/**
* This handles the case in which Elasticsearch has had inline script disabled.
* This is achieved by setting the `script.allowed_types` flag on Elasticsearch to `none`
*/
test('handles failure due to inline scripts being disabled', () => {
const logger = mockLogger();
const claim = jest.fn(() => {
throw Object.assign(new Error(), {
msg: '[illegal_argument_exception] cannot execute [inline] scripts',
path: '/.kibana_task_manager/_update_by_query',
query: {
ignore_unavailable: true,
refresh: true,
max_docs: 200,
conflicts: 'proceed',
},
body:
'{"query":{"bool":{"must":[{"term":{"type":"task"}},{"bool":{"must":[{"bool":{"should":[{"bool":{"must":[{"term":{"task.status":"idle"}},{"range":{"task.runAt":{"lte":"now"}}}]}},{"bool":{"must":[{"bool":{"should":[{"term":{"task.status":"running"}},{"term":{"task.status":"claiming"}}]}},{"range":{"task.retryAt":{"lte":"now"}}}]}}]}},{"bool":{"should":[{"exists":{"field":"task.schedule"}},{"bool":{"must":[{"term":{"task.taskType":"vis_telemetry"}},{"range":{"task.attempts":{"lt":3}}}]}},{"bool":{"must":[{"term":{"task.taskType":"lens_telemetry"}},{"range":{"task.attempts":{"lt":3}}}]}},{"bool":{"must":[{"term":{"task.taskType":"actions:.server-log"}},{"range":{"task.attempts":{"lt":1}}}]}},{"bool":{"must":[{"term":{"task.taskType":"actions:.slack"}},{"range":{"task.attempts":{"lt":1}}}]}},{"bool":{"must":[{"term":{"task.taskType":"actions:.email"}},{"range":{"task.attempts":{"lt":1}}}]}},{"bool":{"must":[{"term":{"task.taskType":"actions:.index"}},{"range":{"task.attempts":{"lt":1}}}]}},{"bool":{"must":[{"term":{"task.taskType":"actions:.pagerduty"}},{"range":{"task.attempts":{"lt":1}}}]}},{"bool":{"must":[{"term":{"task.taskType":"actions:.webhook"}},{"range":{"task.attempts":{"lt":1}}}]}}]}}]}}]}},"sort":{"_script":{"type":"number","order":"asc","script":{"lang":"expression","source":"doc[\'task.retryAt\'].value || doc[\'task.runAt\'].value"}}},"seq_no_primary_term":true,"script":{"source":"ctx._source.task.ownerId=params.ownerId; ctx._source.task.status=params.status; ctx._source.task.retryAt=params.retryAt;","lang":"painless","params":{"ownerId":"kibana:5b2de169-2785-441b-ae8c-186a1936b17d","retryAt":"2019-10-31T13:35:43.579Z","status":"claiming"}}}',
statusCode: 400,
response:
'{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"cannot execute [inline] scripts"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":true,"failed_shards":[{"shard":0,"index":".kibana_task_manager_1","node":"24A4QbjHSK6prvtopAKLKw","reason":{"type":"illegal_argument_exception","reason":"cannot execute [inline] scripts"}}],"caused_by":{"type":"illegal_argument_exception","reason":"cannot execute [inline] scripts","caused_by":{"type":"illegal_argument_exception","reason":"cannot execute [inline] scripts"}}},"status":400}',
});
});
claimAvailableTasks([], claim, 10, logger);
expect(logger.warn).toHaveBeenCalledTimes(1);
expect(logger.warn.mock.calls[0][0]).toMatchInlineSnapshot(
`"Task Manager cannot operate when inline scripts are disabled in Elasticsearch"`
);
});
});
});

View file

@ -1,544 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Subject, Observable, Subscription } from 'rxjs';
import { filter } from 'rxjs/operators';
import { performance } from 'perf_hooks';
import { pipe } from 'fp-ts/lib/pipeable';
import { Option, some, map as mapOptional, getOrElse } from 'fp-ts/lib/Option';
import {
SavedObjectsSerializer,
ILegacyScopedClusterClient,
ISavedObjectsRepository,
} from '../../../../src/core/server';
import { Result, asOk, asErr, either, map, mapErr, promiseResult } from './lib/result_type';
import { createManagedConfiguration } from './lib/create_managed_configuration';
import { TaskManagerConfig } from './config';
import { Logger } from './types';
import {
TaskMarkRunning,
TaskRun,
TaskClaim,
TaskRunRequest,
isTaskRunEvent,
isTaskClaimEvent,
isTaskRunRequestEvent,
asTaskRunRequestEvent,
} from './task_events';
import { fillPool, FillPoolResult } from './lib/fill_pool';
import { addMiddlewareToChain, BeforeSaveMiddlewareParams, Middleware } from './lib/middleware';
import { sanitizeTaskDefinitions } from './lib/sanitize_task_definitions';
import { intervalFromNow } from './lib/intervals';
import {
TaskDefinition,
TaskDictionary,
ConcreteTaskInstance,
RunContext,
TaskInstanceWithId,
TaskInstanceWithDeprecatedFields,
TaskLifecycle,
TaskLifecycleResult,
TaskStatus,
ElasticJs,
} from './task';
import {
createTaskPoller,
PollingError,
PollingErrorType,
createObservableMonitor,
} from './polling';
import { TaskPool } from './task_pool';
import { TaskManagerRunner, TaskRunner } from './task_runner';
import {
FetchResult,
TaskStore,
OwnershipClaimingOpts,
ClaimOwnershipResult,
SearchOpts,
} from './task_store';
import { identifyEsError } from './lib/identify_es_error';
import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields';
import { BufferedTaskStore } from './buffered_task_store';
const VERSION_CONFLICT_STATUS = 409;
export interface TaskManagerOpts {
logger: Logger;
config: TaskManagerConfig;
callAsInternalUser: ILegacyScopedClusterClient['callAsInternalUser'];
savedObjectsRepository: ISavedObjectsRepository;
serializer: SavedObjectsSerializer;
taskManagerId: string;
}
interface RunNowResult {
id: string;
}
export type TaskLifecycleEvent = TaskMarkRunning | TaskRun | TaskClaim | TaskRunRequest;
/*
* The TaskManager is the public interface into the task manager system. This glues together
* all of the disparate modules in one integration point. The task manager operates in two different ways:
*
* - pre-init, it allows middleware registration, but disallows task manipulation
* - post-init, it disallows middleware registration, but allows task manipulation
*
* Due to its complexity, this is mostly tested by integration tests (see readme).
*/
/**
* The public interface into the task manager system.
*/
export class TaskManager {
private definitions: TaskDictionary<TaskDefinition> = {};
private store: TaskStore;
private bufferedStore: BufferedTaskStore;
private logger: Logger;
private pool: TaskPool;
// all task related events (task claimed, task marked as running, etc.) are emitted through events$
private events$ = new Subject<TaskLifecycleEvent>();
// all on-demand requests we wish to pipe into the poller
private claimRequests$ = new Subject<Option<string>>();
// the task poller that polls for work on fixed intervals and on demand
private poller$: Observable<Result<FillPoolResult, PollingError<string>>>;
// our subscription to the poller
private pollingSubscription: Subscription = Subscription.EMPTY;
private startQueue: Array<() => void> = [];
private middleware = {
beforeSave: async (saveOpts: BeforeSaveMiddlewareParams) => saveOpts,
beforeRun: async (runOpts: RunContext) => runOpts,
beforeMarkRunning: async (runOpts: RunContext) => runOpts,
};
/**
* Initializes the task manager, preventing any further addition of middleware,
* enabling the task manipulation methods, and beginning the background polling
* mechanism.
*/
constructor(opts: TaskManagerOpts) {
this.logger = opts.logger;
const { taskManagerId } = opts;
if (!taskManagerId) {
this.logger.error(
`TaskManager is unable to start as there the Kibana UUID is invalid (value of the "server.uuid" configuration is ${taskManagerId})`
);
throw new Error(`TaskManager is unable to start as Kibana has no valid UUID assigned to it.`);
} else {
this.logger.info(`TaskManager is identified by the Kibana UUID: ${taskManagerId}`);
}
this.store = new TaskStore({
serializer: opts.serializer,
savedObjectsRepository: opts.savedObjectsRepository,
callCluster: (opts.callAsInternalUser as unknown) as ElasticJs,
index: opts.config.index,
maxAttempts: opts.config.max_attempts,
definitions: this.definitions,
taskManagerId: `kibana:${taskManagerId}`,
});
// pipe store events into the TaskManager's event stream
this.store.events.subscribe((event) => this.events$.next(event));
const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({
logger: this.logger,
errors$: this.store.errors$,
startingMaxWorkers: opts.config.max_workers,
startingPollInterval: opts.config.poll_interval,
});
this.bufferedStore = new BufferedTaskStore(this.store, {
bufferMaxOperations: opts.config.max_workers,
logger: this.logger,
});
this.pool = new TaskPool({
logger: this.logger,
maxWorkers$: maxWorkersConfiguration$,
});
const {
max_poll_inactivity_cycles: maxPollInactivityCycles,
poll_interval: pollInterval,
} = opts.config;
this.poller$ = createObservableMonitor<Result<FillPoolResult, PollingError<string>>, Error>(
() =>
createTaskPoller<string, FillPoolResult>({
logger: this.logger,
pollInterval$: pollIntervalConfiguration$,
bufferCapacity: opts.config.request_capacity,
getCapacity: () => this.pool.availableWorkers,
pollRequests$: this.claimRequests$,
work: this.pollForWork,
// Time out the `work` phase if it takes longer than a certain number of polling cycles
// The `work` phase includes the prework needed *before* executing a task
// (such as polling for new work, marking tasks as running etc.) but does not
// include the time of actually running the task
workTimeout: pollInterval * maxPollInactivityCycles,
}),
{
heartbeatInterval: pollInterval,
// Time out the poller itself if it has failed to complete the entire stream for a certain amount of time.
// This is different that the `work` timeout above, as the poller could enter an invalid state where
// it fails to complete a cycle even thought `work` is completing quickly.
// We grant it a single cycle longer than the time alotted to `work` so that timing out the `work`
// doesn't get short circuited by the monitor reinstantiating the poller all together (a far more expensive
// operation than just timing out the `work` internally)
inactivityTimeout: pollInterval * (maxPollInactivityCycles + 1),
onError: (error) => {
this.logger.error(`[Task Poller Monitor]: ${error.message}`);
},
}
);
}
private emitEvent = (event: TaskLifecycleEvent) => {
this.events$.next(event);
};
private attemptToRun(task: string) {
this.claimRequests$.next(some(task));
}
private createTaskRunnerForTask = (instance: ConcreteTaskInstance) => {
return new TaskManagerRunner({
logger: this.logger,
instance,
store: this.bufferedStore,
definitions: this.definitions,
beforeRun: this.middleware.beforeRun,
beforeMarkRunning: this.middleware.beforeMarkRunning,
onTaskEvent: this.emitEvent,
});
};
public get isStarted() {
return !this.pollingSubscription.closed;
}
private pollForWork = async (...tasksToClaim: string[]): Promise<FillPoolResult> => {
return fillPool(
// claim available tasks
() =>
claimAvailableTasks(
tasksToClaim.splice(0, this.pool.availableWorkers),
this.store.claimAvailableTasks,
this.pool.availableWorkers,
this.logger
),
// wrap each task in a Task Runner
this.createTaskRunnerForTask,
// place tasks in the Task Pool
async (tasks: TaskRunner[]) => await this.pool.run(tasks)
);
};
/**
* Starts up the task manager and starts picking up tasks.
*/
public start() {
if (!this.isStarted) {
// Some calls are waiting until task manager is started
this.startQueue.forEach((fn) => fn());
this.startQueue = [];
this.pollingSubscription = this.poller$.subscribe(
mapErr((error: PollingError<string>) => {
if (error.type === PollingErrorType.RequestCapacityReached) {
pipe(
error.data,
mapOptional((id) => this.emitEvent(asTaskRunRequestEvent(id, asErr(error))))
);
}
this.logger.error(error.message);
})
);
}
}
private async waitUntilStarted() {
if (!this.isStarted) {
await new Promise((resolve) => {
this.startQueue.push(resolve);
});
}
}
/**
* Stops the task manager and cancels running tasks.
*/
public stop() {
if (this.isStarted) {
this.pollingSubscription.unsubscribe();
this.pool.cancelRunningTasks();
}
}
/**
* Method for allowing consumers to register task definitions into the system.
* @param taskDefinitions - The Kibana task definitions dictionary
*/
public registerTaskDefinitions(taskDefinitions: TaskDictionary<TaskDefinition>) {
this.assertUninitialized('register task definitions', Object.keys(taskDefinitions).join(', '));
const duplicate = Object.keys(taskDefinitions).find((k) => !!this.definitions[k]);
if (duplicate) {
throw new Error(`Task ${duplicate} is already defined!`);
}
try {
const sanitized = sanitizeTaskDefinitions(taskDefinitions);
Object.assign(this.definitions, sanitized);
} catch (e) {
this.logger.error('Could not sanitize task definitions');
}
}
/**
* Adds middleware to the task manager, such as adding security layers, loggers, etc.
*
* @param {Middleware} middleware - The middlware being added.
*/
public addMiddleware(middleware: Middleware) {
this.assertUninitialized('add middleware');
const prevMiddleWare = this.middleware;
this.middleware = addMiddlewareToChain(prevMiddleWare, middleware);
}
/**
* Schedules a task.
*
* @param task - The task being scheduled.
* @returns {Promise<ConcreteTaskInstance>}
*/
public async schedule(
taskInstance: TaskInstanceWithDeprecatedFields,
options?: Record<string, unknown>
): Promise<ConcreteTaskInstance> {
await this.waitUntilStarted();
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
...options,
taskInstance: ensureDeprecatedFieldsAreCorrected(taskInstance, this.logger),
});
return await this.store.schedule(modifiedTask);
}
/**
* Run task.
*
* @param taskId - The task being scheduled.
* @returns {Promise<ConcreteTaskInstance>}
*/
public async runNow(taskId: string): Promise<RunNowResult> {
await this.waitUntilStarted();
return new Promise(async (resolve, reject) => {
awaitTaskRunResult(taskId, this.events$, this.store.getLifecycle.bind(this.store))
.then(resolve)
.catch(reject);
this.attemptToRun(taskId);
});
}
/**
* Schedules a task with an Id
*
* @param task - The task being scheduled.
* @returns {Promise<TaskInstanceWithId>}
*/
public async ensureScheduled(
taskInstance: TaskInstanceWithId,
options?: Record<string, unknown>
): Promise<TaskInstanceWithId> {
try {
return await this.schedule(taskInstance, options);
} catch (err) {
if (err.statusCode === VERSION_CONFLICT_STATUS) {
return taskInstance;
}
throw err;
}
}
/**
* Fetches a list of scheduled tasks.
*
* @param opts - The query options used to filter tasks
* @returns {Promise<FetchResult>}
*/
public async fetch(opts: SearchOpts): Promise<FetchResult> {
await this.waitUntilStarted();
return this.store.fetch(opts);
}
/**
* Get the current state of a specified task.
*
* @param {string} id
* @returns {Promise<RemoveResult>}
*/
public async get(id: string): Promise<ConcreteTaskInstance> {
await this.waitUntilStarted();
return this.store.get(id);
}
/**
* Removes the specified task from the index.
*
* @param {string} id
* @returns {Promise<RemoveResult>}
*/
public async remove(id: string): Promise<void> {
await this.waitUntilStarted();
return this.store.remove(id);
}
/**
* Ensures task manager IS NOT already initialized
*
* @param {string} message shown if task manager is already initialized
* @returns void
*/
private assertUninitialized(message: string, context?: string) {
if (this.isStarted) {
throw new Error(
`${context ? `[${context}] ` : ''}Cannot ${message} after the task manager is initialized`
);
}
}
}
export async function claimAvailableTasks(
claimTasksById: string[],
claim: (opts: OwnershipClaimingOpts) => Promise<ClaimOwnershipResult>,
availableWorkers: number,
logger: Logger
) {
if (availableWorkers > 0) {
performance.mark('claimAvailableTasks_start');
try {
const { docs, claimedTasks } = await claim({
size: availableWorkers,
claimOwnershipUntil: intervalFromNow('30s')!,
claimTasksById,
});
if (claimedTasks === 0) {
performance.mark('claimAvailableTasks.noTasks');
}
performance.mark('claimAvailableTasks_stop');
performance.measure(
'claimAvailableTasks',
'claimAvailableTasks_start',
'claimAvailableTasks_stop'
);
if (docs.length !== claimedTasks) {
logger.warn(
`[Task Ownership error]: ${claimedTasks} tasks were claimed by Kibana, but ${
docs.length
} task(s) were fetched (${docs.map((doc) => doc.id).join(', ')})`
);
}
return docs;
} catch (ex) {
if (identifyEsError(ex).includes('cannot execute [inline] scripts')) {
logger.warn(
`Task Manager cannot operate when inline scripts are disabled in Elasticsearch`
);
} else {
throw ex;
}
}
} else {
performance.mark('claimAvailableTasks.noAvailableWorkers');
logger.debug(
`[Task Ownership]: Task Manager has skipped Claiming Ownership of available tasks at it has ran out Available Workers.`
);
}
return [];
}
export async function awaitTaskRunResult(
taskId: string,
events$: Subject<TaskLifecycleEvent>,
getLifecycle: (id: string) => Promise<TaskLifecycle>
): Promise<RunNowResult> {
return new Promise((resolve, reject) => {
const subscription = events$
// listen for all events related to the current task
.pipe(filter(({ id }: TaskLifecycleEvent) => id === taskId))
.subscribe((taskEvent: TaskLifecycleEvent) => {
if (isTaskClaimEvent(taskEvent)) {
mapErr(async (error: Option<ConcreteTaskInstance>) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
return reject(
map(
await pipe(
error,
mapOptional(async (taskReturnedBySweep) => asOk(taskReturnedBySweep.status)),
getOrElse(() =>
// if the error happened in the Claim phase - we try to provide better insight
// into why we failed to claim by getting the task's current lifecycle status
promiseResult<TaskLifecycle, Error>(getLifecycle(taskId))
)
),
(taskLifecycleStatus: TaskLifecycle) => {
if (taskLifecycleStatus === TaskLifecycleResult.NotFound) {
return new Error(`Failed to run task "${taskId}" as it does not exist`);
} else if (
taskLifecycleStatus === TaskStatus.Running ||
taskLifecycleStatus === TaskStatus.Claiming
) {
return new Error(`Failed to run task "${taskId}" as it is currently running`);
}
return new Error(
`Failed to run task "${taskId}" for unknown reason (Current Task Lifecycle is "${taskLifecycleStatus}")`
);
},
(getLifecycleError: Error) =>
new Error(
`Failed to run task "${taskId}" and failed to get current Status:${getLifecycleError}`
)
)
);
}, taskEvent.event);
} else {
either<ConcreteTaskInstance, Error | Option<ConcreteTaskInstance>>(
taskEvent.event,
(taskInstance: ConcreteTaskInstance) => {
// resolve if the task has run sucessfully
if (isTaskRunEvent(taskEvent)) {
subscription.unsubscribe();
resolve({ id: taskInstance.id });
}
},
async (error: Error | Option<ConcreteTaskInstance>) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
if (isTaskRunRequestEvent(taskEvent)) {
return reject(
new Error(
`Failed to run task "${taskId}" as Task Manager is at capacity, please try again later`
)
);
}
return reject(new Error(`Failed to run task "${taskId}": ${error}`));
}
);
}
});
});
}

View file

@ -12,7 +12,7 @@ import { Observable } from 'rxjs';
import moment, { Duration } from 'moment';
import { performance } from 'perf_hooks';
import { padStart } from 'lodash';
import { Logger } from './types';
import { Logger } from '../../../../src/core/server';
import { TaskRunner } from './task_runner';
import { isTaskSavedObjectNotFoundError } from './lib/is_task_not_found_error';

View file

@ -9,11 +9,12 @@ import sinon from 'sinon';
import { minutesFromNow } from './lib/intervals';
import { asOk, asErr } from './lib/result_type';
import { TaskEvent, asTaskRunEvent, asTaskMarkRunningEvent } from './task_events';
import { ConcreteTaskInstance, TaskStatus, TaskDictionary, TaskDefinition } from './task';
import { ConcreteTaskInstance, TaskStatus, TaskDefinition, RunResult } from './task';
import { TaskManagerRunner } from './task_runner';
import { mockLogger } from './test_utils';
import { SavedObjectsErrorHelpers } from '../../../../src/core/server';
import moment from 'moment';
import { TaskTypeDictionary } from './task_type_dictionary';
import { mockLogger } from './test_utils';
let fakeTimer: sinon.SinonFakeTimers;
@ -67,6 +68,7 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
throw new Error('Dangit!');
@ -96,9 +98,10 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
return;
return { state: {} };
},
}),
},
@ -124,10 +127,11 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
timeout: `1m`,
createTaskRunner: () => ({
async run() {
return;
return { state: {} };
},
}),
},
@ -150,10 +154,11 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
timeout: `1m`,
createTaskRunner: () => ({
async run() {
return;
return { state: {} };
},
}),
},
@ -171,9 +176,10 @@ describe('TaskManagerRunner', () => {
const { runner, store } = testOpts({
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
return { runAt };
return { runAt, state: {} };
},
}),
},
@ -194,9 +200,10 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
return { runAt };
return { runAt, state: {} };
},
}),
},
@ -218,6 +225,7 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
return undefined;
@ -238,6 +246,7 @@ describe('TaskManagerRunner', () => {
const { runner, logger } = testOpts({
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
const promise = new Promise((r) => setTimeout(r, 1000));
@ -265,6 +274,7 @@ describe('TaskManagerRunner', () => {
const { runner, logger } = testOpts({
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
run: async () => undefined,
}),
@ -291,6 +301,7 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
timeout: `${timeoutMinutes}m`,
createTaskRunner: () => ({
run: async () => undefined,
@ -325,6 +336,7 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
getRetry: getRetryStub,
createTaskRunner: () => ({
async run() {
@ -356,6 +368,7 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
getRetry: getRetryStub,
createTaskRunner: () => ({
async run() {
@ -388,6 +401,7 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
getRetry: getRetryStub,
createTaskRunner: () => ({
async run() {
@ -421,6 +435,7 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
getRetry: getRetryStub,
createTaskRunner: () => ({
async run() {
@ -456,6 +471,7 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
timeout: `${timeoutMinutes}m`,
getRetry: getRetryStub,
createTaskRunner: () => ({
@ -490,6 +506,7 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
timeout: `${timeoutMinutes}m`,
getRetry: getRetryStub,
createTaskRunner: () => ({
@ -522,6 +539,7 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
timeout: `${timeoutMinutes}m`,
getRetry: getRetryStub,
createTaskRunner: () => ({
@ -557,6 +575,7 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
timeout: `${timeoutMinutes}m`,
getRetry: getRetryStub,
createTaskRunner: () => ({
@ -592,6 +611,7 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
timeout: `${timeoutMinutes}m`,
getRetry: getRetryStub,
createTaskRunner: () => ({
@ -625,6 +645,7 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
timeout: `${timeoutMinutes}m`,
getRetry: getRetryStub,
createTaskRunner: () => ({
@ -655,6 +676,7 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
maxAttempts: 3,
createTaskRunner: () => ({
run: async () => {
@ -688,6 +710,7 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
maxAttempts: 3,
createTaskRunner: () => ({
run: async () => {
@ -720,8 +743,8 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
timeout: `1m`,
getRetry: () => {},
createTaskRunner: () => ({
run: async () => undefined,
}),
@ -748,8 +771,8 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
timeout: `1m`,
getRetry: () => {},
createTaskRunner: () => ({
run: async () => undefined,
}),
@ -777,9 +800,10 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
return {};
return { state: {} };
},
}),
},
@ -803,9 +827,10 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
return { runAt };
return { runAt, state: {} };
},
}),
},
@ -828,6 +853,7 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
throw error;
@ -855,9 +881,10 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
return { error };
return { error, state: {} };
},
}),
},
@ -882,10 +909,11 @@ describe('TaskManagerRunner', () => {
},
definitions: {
bar: {
title: 'Bar!',
getRetry: () => false,
createTaskRunner: () => ({
async run() {
return { error };
return { error, state: {} };
},
}),
},
@ -904,7 +932,7 @@ describe('TaskManagerRunner', () => {
interface TestOpts {
instance?: Partial<ConcreteTaskInstance>;
definitions?: unknown;
definitions?: Record<string, Omit<TaskDefinition, 'type'>>;
onTaskEvent?: (event: TaskEvent<unknown, unknown>) => void;
}
@ -942,19 +970,24 @@ describe('TaskManagerRunner', () => {
store.update.returns(instance);
const definitions = new TaskTypeDictionary(logger);
definitions.registerTaskDefinitions({
testbar: {
title: 'Bar!',
createTaskRunner,
},
});
if (opts.definitions) {
definitions.registerTaskDefinitions(opts.definitions);
}
const runner = new TaskManagerRunner({
beforeRun: (context) => Promise.resolve(context),
beforeMarkRunning: (context) => Promise.resolve(context),
logger,
store,
instance,
definitions: Object.assign(opts.definitions || {}, {
testbar: {
type: 'bar',
title: 'Bar!',
createTaskRunner,
},
}) as TaskDictionary<TaskDefinition>,
definitions,
onTaskEvent: opts.onTaskEvent,
});
@ -972,8 +1005,9 @@ describe('TaskManagerRunner', () => {
const { runner, logger } = testOpts({
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
run: async () => result,
run: async () => result as RunResult,
}),
},
},

View file

@ -15,11 +15,11 @@ import { performance } from 'perf_hooks';
import Joi from 'joi';
import { identity, defaults, flow } from 'lodash';
import { Logger } from '../../../../src/core/server';
import { asOk, asErr, mapErr, eitherAsync, unwrap, mapOk, Result } from './lib/result_type';
import { TaskRun, TaskMarkRunning, asTaskRunEvent, asTaskMarkRunningEvent } from './task_events';
import { intervalFromDate, intervalFromNow } from './lib/intervals';
import { Logger } from './types';
import { BeforeRunFunction, BeforeMarkRunningFunction } from './lib/middleware';
import { Middleware } from './lib/middleware';
import {
CancelFunction,
CancellableTask,
@ -29,10 +29,10 @@ import {
FailedRunResult,
FailedTaskResult,
TaskDefinition,
TaskDictionary,
validateRunResult,
TaskStatus,
} from './task';
import { TaskTypeDictionary } from './task_type_dictionary';
const defaultBackoffPerFailure = 5 * 60 * 1000;
const EMPTY_RUN_RESULT: SuccessfulRunResult = {};
@ -55,15 +55,13 @@ export interface Updatable {
remove(id: string): Promise<void>;
}
interface Opts {
type Opts = {
logger: Logger;
definitions: TaskDictionary<TaskDefinition>;
definitions: TaskTypeDictionary;
instance: ConcreteTaskInstance;
store: Updatable;
beforeRun: BeforeRunFunction;
beforeMarkRunning: BeforeMarkRunningFunction;
onTaskEvent?: (event: TaskRun | TaskMarkRunning) => void;
}
} & Pick<Middleware, 'beforeRun' | 'beforeMarkRunning'>;
/**
* Runs a background task, ensures that errors are properly handled,
@ -76,11 +74,11 @@ interface Opts {
export class TaskManagerRunner implements TaskRunner {
private task?: CancellableTask;
private instance: ConcreteTaskInstance;
private definitions: TaskDictionary<TaskDefinition>;
private definitions: TaskTypeDictionary;
private logger: Logger;
private bufferedTaskStore: Updatable;
private beforeRun: BeforeRunFunction;
private beforeMarkRunning: BeforeMarkRunningFunction;
private beforeRun: Middleware['beforeRun'];
private beforeMarkRunning: Middleware['beforeMarkRunning'];
private onTaskEvent: (event: TaskRun | TaskMarkRunning) => void;
/**
@ -129,7 +127,7 @@ export class TaskManagerRunner implements TaskRunner {
* Gets the task defintion from the dictionary.
*/
public get definition() {
return this.definitions[this.taskType];
return this.definitions.get(this.taskType);
}
/**

View file

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { TaskScheduling } from './task_scheduling';
const createTaskSchedulingMock = () => {
return ({
ensureScheduled: jest.fn(),
schedule: jest.fn(),
runNow: jest.fn(),
} as unknown) as jest.Mocked<TaskScheduling>;
};
export const taskSchedulingMock = {
create: createTaskSchedulingMock,
};

View file

@ -0,0 +1,319 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import _ from 'lodash';
import { Subject } from 'rxjs';
import { none } from 'fp-ts/lib/Option';
import {
asTaskMarkRunningEvent,
asTaskRunEvent,
asTaskClaimEvent,
asTaskRunRequestEvent,
} from './task_events';
import { TaskLifecycleEvent } from './polling_lifecycle';
import { taskPollingLifecycleMock } from './polling_lifecycle.mock';
import { TaskScheduling } from './task_scheduling';
import { asErr, asOk } from './lib/result_type';
import { ConcreteTaskInstance, TaskLifecycleResult, TaskStatus } from './task';
import { createInitialMiddleware } from './lib/middleware';
import { taskStoreMock } from './task_store.mock';
import { mockLogger } from './test_utils';
describe('TaskScheduling', () => {
const mockTaskStore = taskStoreMock.create({});
const mockTaskManager = taskPollingLifecycleMock.create({});
const taskSchedulingOpts = {
taskStore: mockTaskStore,
taskPollingLifecycle: mockTaskManager,
logger: mockLogger(),
middleware: createInitialMiddleware(),
};
beforeEach(() => {
jest.resetAllMocks();
});
test('allows scheduling tasks', async () => {
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const task = {
taskType: 'foo',
params: {},
state: {},
};
await taskScheduling.schedule(task);
expect(mockTaskStore.schedule).toHaveBeenCalled();
});
test('allows scheduling existing tasks that may have already been scheduled', async () => {
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
mockTaskStore.schedule.mockRejectedValueOnce({
statusCode: 409,
});
const result = await taskScheduling.ensureScheduled({
id: 'my-foo-id',
taskType: 'foo',
params: {},
state: {},
});
expect(result.id).toEqual('my-foo-id');
});
test('doesnt ignore failure to scheduling existing tasks for reasons other than already being scheduled', async () => {
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
mockTaskStore.schedule.mockRejectedValueOnce({
statusCode: 500,
});
return expect(
taskScheduling.ensureScheduled({
id: 'my-foo-id',
taskType: 'foo',
params: {},
state: {},
})
).rejects.toMatchObject({
statusCode: 500,
});
});
test('doesnt allow naively rescheduling existing tasks that have already been scheduled', async () => {
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
mockTaskStore.schedule.mockRejectedValueOnce({
statusCode: 409,
});
return expect(
taskScheduling.schedule({
id: 'my-foo-id',
taskType: 'foo',
params: {},
state: {},
})
).rejects.toMatchObject({
statusCode: 409,
});
});
describe('runNow', () => {
test('resolves when the task run succeeds', () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
const result = taskScheduling.runNow(id);
const task = { id } as ConcreteTaskInstance;
events$.next(asTaskRunEvent(id, asOk(task)));
return expect(result).resolves.toEqual({ id });
});
test('rejects when the task run fails', () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
const result = taskScheduling.runNow(id);
const task = { id } as ConcreteTaskInstance;
events$.next(asTaskClaimEvent(id, asOk(task)));
events$.next(asTaskMarkRunningEvent(id, asOk(task)));
events$.next(asTaskRunEvent(id, asErr(new Error('some thing gone wrong'))));
return expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
);
});
test('rejects when the task mark as running fails', () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
const result = taskScheduling.runNow(id);
const task = { id } as ConcreteTaskInstance;
events$.next(asTaskClaimEvent(id, asOk(task)));
events$.next(asTaskMarkRunningEvent(id, asErr(new Error('some thing gone wrong'))));
return expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
);
});
test('when a task claim fails we ensure the task exists', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
mockTaskStore.getLifecycle.mockResolvedValue(TaskLifecycleResult.NotFound);
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
const result = taskScheduling.runNow(id);
events$.next(asTaskClaimEvent(id, asErr(none)));
await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it does not exist`)
);
expect(mockTaskStore.getLifecycle).toHaveBeenCalledWith(id);
});
test('when a task claim fails we ensure the task isnt already claimed', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
mockTaskStore.getLifecycle.mockResolvedValue(TaskStatus.Claiming);
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
const result = taskScheduling.runNow(id);
events$.next(asTaskClaimEvent(id, asErr(none)));
await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it is currently running`)
);
expect(mockTaskStore.getLifecycle).toHaveBeenCalledWith(id);
});
test('when a task claim fails we ensure the task isnt already running', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
mockTaskStore.getLifecycle.mockResolvedValue(TaskStatus.Running);
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
const result = taskScheduling.runNow(id);
events$.next(asTaskClaimEvent(id, asErr(none)));
await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it is currently running`)
);
expect(mockTaskStore.getLifecycle).toHaveBeenCalledWith(id);
});
test('rejects when the task run fails due to capacity', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
mockTaskStore.getLifecycle.mockResolvedValue(TaskStatus.Idle);
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
const result = taskScheduling.runNow(id);
events$.next(asTaskRunRequestEvent(id, asErr(new Error('failed to buffer request'))));
await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}": Task Manager is at capacity, please try again later`)
);
expect(mockTaskStore.getLifecycle).not.toHaveBeenCalled();
});
test('when a task claim fails we return the underlying error if the task is idle', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
mockTaskStore.getLifecycle.mockResolvedValue(TaskStatus.Idle);
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
const result = taskScheduling.runNow(id);
events$.next(asTaskClaimEvent(id, asErr(none)));
await expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "idle")]`
);
expect(mockTaskStore.getLifecycle).toHaveBeenCalledWith(id);
});
test('when a task claim fails we return the underlying error if the task is failed', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
mockTaskStore.getLifecycle.mockResolvedValue(TaskStatus.Failed);
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
const result = taskScheduling.runNow(id);
events$.next(asTaskClaimEvent(id, asErr(none)));
await expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "failed")]`
);
expect(mockTaskStore.getLifecycle).toHaveBeenCalledWith(id);
});
test('ignores task run success of other tasks', () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const differentTask = '4bebf429-181b-4518-bb7d-b4246d8a35f0';
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
const result = taskScheduling.runNow(id);
const task = { id } as ConcreteTaskInstance;
const otherTask = { id: differentTask } as ConcreteTaskInstance;
events$.next(asTaskClaimEvent(id, asOk(task)));
events$.next(asTaskClaimEvent(differentTask, asOk(otherTask)));
events$.next(asTaskRunEvent(differentTask, asOk(task)));
events$.next(asTaskRunEvent(id, asErr(new Error('some thing gone wrong'))));
return expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
);
});
});
});

View file

@ -0,0 +1,179 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { filter } from 'rxjs/operators';
import { pipe } from 'fp-ts/lib/pipeable';
import { Option, map as mapOptional, getOrElse } from 'fp-ts/lib/Option';
import { Logger } from '../../../../src/core/server';
import { asOk, either, map, mapErr, promiseResult } from './lib/result_type';
import { isTaskRunEvent, isTaskClaimEvent, isTaskRunRequestEvent } from './task_events';
import { Middleware } from './lib/middleware';
import {
ConcreteTaskInstance,
TaskInstanceWithId,
TaskInstanceWithDeprecatedFields,
TaskLifecycle,
TaskLifecycleResult,
TaskStatus,
} from './task';
import { TaskStore } from './task_store';
import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields';
import { TaskLifecycleEvent, TaskPollingLifecycle } from './polling_lifecycle';
const VERSION_CONFLICT_STATUS = 409;
export interface TaskSchedulingOpts {
logger: Logger;
taskStore: TaskStore;
taskPollingLifecycle: TaskPollingLifecycle;
middleware: Middleware;
}
interface RunNowResult {
id: string;
}
export class TaskScheduling {
private store: TaskStore;
private taskPollingLifecycle: TaskPollingLifecycle;
private logger: Logger;
private middleware: Middleware;
/**
* Initializes the task manager, preventing any further addition of middleware,
* enabling the task manipulation methods, and beginning the background polling
* mechanism.
*/
constructor(opts: TaskSchedulingOpts) {
this.logger = opts.logger;
this.middleware = opts.middleware;
this.taskPollingLifecycle = opts.taskPollingLifecycle;
this.store = opts.taskStore;
}
/**
* Schedules a task.
*
* @param task - The task being scheduled.
* @returns {Promise<ConcreteTaskInstance>}
*/
public async schedule(
taskInstance: TaskInstanceWithDeprecatedFields,
options?: Record<string, unknown>
): Promise<ConcreteTaskInstance> {
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
...options,
taskInstance: ensureDeprecatedFieldsAreCorrected(taskInstance, this.logger),
});
return await this.store.schedule(modifiedTask);
}
/**
* Run task.
*
* @param taskId - The task being scheduled.
* @returns {Promise<ConcreteTaskInstance>}
*/
public async runNow(taskId: string): Promise<RunNowResult> {
return new Promise(async (resolve, reject) => {
this.awaitTaskRunResult(taskId).then(resolve).catch(reject);
this.taskPollingLifecycle.attemptToRun(taskId);
});
}
/**
* Schedules a task with an Id
*
* @param task - The task being scheduled.
* @returns {Promise<TaskInstanceWithId>}
*/
public async ensureScheduled(
taskInstance: TaskInstanceWithId,
options?: Record<string, unknown>
): Promise<TaskInstanceWithId> {
try {
return await this.schedule(taskInstance, options);
} catch (err) {
if (err.statusCode === VERSION_CONFLICT_STATUS) {
return taskInstance;
}
throw err;
}
}
private async awaitTaskRunResult(taskId: string): Promise<RunNowResult> {
return new Promise((resolve, reject) => {
const subscription = this.taskPollingLifecycle.events
// listen for all events related to the current task
.pipe(filter(({ id }: TaskLifecycleEvent) => id === taskId))
.subscribe((taskEvent: TaskLifecycleEvent) => {
if (isTaskClaimEvent(taskEvent)) {
mapErr(async (error: Option<ConcreteTaskInstance>) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
return reject(await this.identifyTaskFailureReason(taskId, error));
}, taskEvent.event);
} else {
either<ConcreteTaskInstance, Error | Option<ConcreteTaskInstance>>(
taskEvent.event,
(taskInstance: ConcreteTaskInstance) => {
// resolve if the task has run sucessfully
if (isTaskRunEvent(taskEvent)) {
subscription.unsubscribe();
resolve({ id: taskInstance.id });
}
},
async (error: Error | Option<ConcreteTaskInstance>) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
return reject(
new Error(
`Failed to run task "${taskId}": ${
isTaskRunRequestEvent(taskEvent)
? `Task Manager is at capacity, please try again later`
: error
}`
)
);
}
);
}
});
});
}
private async identifyTaskFailureReason(taskId: string, error: Option<ConcreteTaskInstance>) {
return map(
await pipe(
error,
mapOptional(async (taskReturnedBySweep) => asOk(taskReturnedBySweep.status)),
getOrElse(() =>
// if the error happened in the Claim phase - we try to provide better insight
// into why we failed to claim by getting the task's current lifecycle status
promiseResult<TaskLifecycle, Error>(this.store.getLifecycle(taskId))
)
),
(taskLifecycleStatus: TaskLifecycle) => {
if (taskLifecycleStatus === TaskLifecycleResult.NotFound) {
return new Error(`Failed to run task "${taskId}" as it does not exist`);
} else if (
taskLifecycleStatus === TaskStatus.Running ||
taskLifecycleStatus === TaskStatus.Claiming
) {
return new Error(`Failed to run task "${taskId}" as it is currently running`);
}
return new Error(
`Failed to run task "${taskId}" for unknown reason (Current Task Lifecycle is "${taskLifecycleStatus}")`
);
},
(getLifecycleError: Error) =>
new Error(
`Failed to run task "${taskId}" and failed to get current Status:${getLifecycleError}`
)
);
}
}

View file

@ -4,15 +4,24 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { Observable, Subject } from 'rxjs';
import { TaskClaim } from './task_events';
import { TaskStore } from './task_store';
interface TaskStoreOptions {
maxAttempts?: number;
index?: string;
taskManagerId?: string;
events?: Observable<TaskClaim>;
}
export const taskStoreMock = {
create({ maxAttempts = 0, index = '', taskManagerId = '' }: TaskStoreOptions) {
create({
maxAttempts = 0,
index = '',
taskManagerId = '',
events = new Subject<TaskClaim>(),
}: TaskStoreOptions) {
const mocked = ({
update: jest.fn(),
remove: jest.fn(),
@ -25,6 +34,7 @@ export const taskStoreMock = {
maxAttempts,
index,
taskManagerId,
events,
} as unknown) as jest.Mocked<TaskStore>;
return mocked;
},

View file

@ -5,20 +5,18 @@
*/
import _ from 'lodash';
import sinon from 'sinon';
import uuid from 'uuid';
import { filter, take, first } from 'rxjs/operators';
import { Option, some, none } from 'fp-ts/lib/Option';
import {
TaskDictionary,
TaskDefinition,
TaskInstance,
TaskStatus,
TaskLifecycleResult,
SerializedConcreteTaskInstance,
ConcreteTaskInstance,
} from './task';
import { elasticsearchServiceMock } from '../../../../src/core/server/mocks';
import { StoreOpts, OwnershipClaimingOpts, TaskStore, SearchOpts } from './task_store';
import { savedObjectsRepositoryMock } from 'src/core/server/mocks';
import {
@ -29,24 +27,11 @@ import {
} from 'src/core/server';
import { asTaskClaimEvent, TaskEvent } from './task_events';
import { asOk, asErr } from './lib/result_type';
const taskDefinitions: TaskDictionary<TaskDefinition> = {
report: {
type: 'report',
title: '',
createTaskRunner: jest.fn(),
},
dernstraight: {
type: 'dernstraight',
title: '',
createTaskRunner: jest.fn(),
},
yawn: {
type: 'yawn',
title: '',
createTaskRunner: jest.fn(),
},
};
import { TaskTypeDictionary } from './task_type_dictionary';
import { RequestEvent } from '@elastic/elasticsearch/lib/Transport';
import { Search, UpdateByQuery } from '@elastic/elasticsearch/api/requestParams';
import { BoolClauseWithAnyCondition, TermFilter } from './queries/query_clauses';
import { mockLogger } from './test_utils';
const savedObjectsClient = savedObjectsRepositoryMock.create();
const serializer = new SavedObjectsSerializer(new SavedObjectTypeRegistry());
@ -64,6 +49,22 @@ const mockedDate = new Date('2019-02-12T21:01:22.479Z');
}
};
const taskDefinitions = new TaskTypeDictionary(mockLogger());
taskDefinitions.registerTaskDefinitions({
report: {
title: 'report',
createTaskRunner: jest.fn(),
},
dernstraight: {
title: 'dernstraight',
createTaskRunner: jest.fn(),
},
yawn: {
title: 'yawn',
createTaskRunner: jest.fn(),
},
});
describe('TaskStore', () => {
describe('schedule', () => {
let store: TaskStore;
@ -73,7 +74,7 @@ describe('TaskStore', () => {
index: 'tasky',
taskManagerId: '',
serializer,
callCluster: jest.fn(),
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
@ -198,14 +199,15 @@ describe('TaskStore', () => {
describe('fetch', () => {
let store: TaskStore;
const callCluster = jest.fn();
let esClient: ReturnType<typeof elasticsearchServiceMock.createClusterClient>['asInternalUser'];
beforeAll(() => {
esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
callCluster,
esClient,
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
@ -213,16 +215,15 @@ describe('TaskStore', () => {
});
async function testFetch(opts?: SearchOpts, hits: unknown[] = []) {
callCluster.mockResolvedValue({ hits: { hits } });
esClient.search.mockResolvedValue(asApiResponse({ hits: { hits } }));
const result = await store.fetch(opts);
expect(callCluster).toHaveBeenCalledTimes(1);
expect(callCluster).toHaveBeenCalledWith('search', expect.anything());
expect(esClient.search).toHaveBeenCalledTimes(1);
return {
result,
args: callCluster.mock.calls[0][1],
args: esClient.search.mock.calls[0][0],
};
}
@ -257,7 +258,7 @@ describe('TaskStore', () => {
test('pushes error from call cluster to errors$', async () => {
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
callCluster.mockRejectedValue(new Error('Failure'));
esClient.search.mockRejectedValue(new Error('Failure'));
await expect(store.fetch()).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});
@ -274,17 +275,18 @@ describe('TaskStore', () => {
claimingOpts: OwnershipClaimingOpts;
}) {
const versionConflicts = 2;
const callCluster = sinon.spy(async (name: string, params?: unknown) =>
name === 'updateByQuery'
? {
total: hits.length + versionConflicts,
updated: hits.length,
version_conflicts: versionConflicts,
}
: { hits: { hits } }
const esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
esClient.search.mockResolvedValue(asApiResponse({ hits: { hits } }));
esClient.updateByQuery.mockResolvedValue(
asApiResponse({
total: hits.length + versionConflicts,
updated: hits.length,
version_conflicts: versionConflicts,
})
);
const store = new TaskStore({
callCluster,
esClient,
maxAttempts: 2,
definitions: taskDefinitions,
serializer,
@ -296,26 +298,41 @@ describe('TaskStore', () => {
const result = await store.claimAvailableTasks(claimingOpts);
sinon.assert.calledTwice(callCluster);
sinon.assert.calledWithMatch(callCluster, 'updateByQuery', { max_docs: claimingOpts.size });
sinon.assert.calledWithMatch(callCluster, 'search', { body: { size: claimingOpts.size } });
expect(esClient.updateByQuery.mock.calls[0][0]).toMatchObject({
max_docs: claimingOpts.size,
});
expect(esClient.search.mock.calls[0][0]).toMatchObject({ body: { size: claimingOpts.size } });
return {
result,
args: Object.assign({}, ...callCluster.args.map(([name, args]) => ({ [name]: args }))),
args: {
search: esClient.search.mock.calls[0][0]! as Search<{
query: BoolClauseWithAnyCondition<TermFilter>;
size: number;
sort: string | string[];
}>,
updateByQuery: esClient.updateByQuery.mock.calls[0][0]! as UpdateByQuery<{
query: BoolClauseWithAnyCondition<TermFilter>;
size: number;
sort: string | string[];
script: object;
}>,
},
};
}
test('it returns normally with no tasks when the index does not exist.', async () => {
const callCluster = sinon.spy(async (name: string, params?: unknown) => ({
total: 0,
updated: 0,
}));
const esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
esClient.updateByQuery.mockResolvedValue(
asApiResponse({
total: 0,
updated: 0,
})
);
const store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
callCluster,
esClient,
definitions: taskDefinitions,
maxAttempts: 2,
savedObjectsRepository: savedObjectsClient,
@ -324,9 +341,8 @@ describe('TaskStore', () => {
claimOwnershipUntil: new Date(),
size: 10,
});
sinon.assert.calledOnce(callCluster);
sinon.assert.calledWithMatch(callCluster, 'updateByQuery', {
ignoreUnavailable: true,
expect(esClient.updateByQuery.mock.calls[0][0]).toMatchObject({
ignore_unavailable: true,
max_docs: 10,
});
expect(docs.length).toBe(0);
@ -335,28 +351,28 @@ describe('TaskStore', () => {
test('it filters claimed tasks down by supported types, maxAttempts, status, and runAt', async () => {
const maxAttempts = _.random(2, 43);
const customMaxAttempts = _.random(44, 100);
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: {
title: 'foo',
createTaskRunner: jest.fn(),
},
bar: {
title: 'bar',
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
});
const {
args: {
updateByQuery: {
body: { query },
},
updateByQuery: { body: { query } = {} },
},
} = await testClaimAvailableTasks({
opts: {
maxAttempts,
definitions: {
foo: {
type: 'foo',
title: '',
createTaskRunner: jest.fn(),
},
bar: {
type: 'bar',
title: '',
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
},
definitions,
},
claimingOpts: { claimOwnershipUntil: new Date(), size: 10 },
});
@ -465,28 +481,26 @@ describe('TaskStore', () => {
test('it supports claiming specific tasks by id', async () => {
const maxAttempts = _.random(2, 43);
const customMaxAttempts = _.random(44, 100);
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: {
title: 'foo',
createTaskRunner: jest.fn(),
},
bar: {
title: 'bar',
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
});
const {
args: {
updateByQuery: {
body: { query, sort },
},
updateByQuery: { body: { query, sort } = {} },
},
} = await testClaimAvailableTasks({
opts: {
maxAttempts,
definitions: {
foo: {
type: 'foo',
title: '',
createTaskRunner: jest.fn(),
},
bar: {
type: 'bar',
title: '',
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
},
definitions,
},
claimingOpts: {
claimOwnershipUntil: new Date(),
@ -634,9 +648,7 @@ if (doc['task.runAt'].size()!=0) {
const claimOwnershipUntil = new Date(Date.now());
const {
args: {
updateByQuery: {
body: { script },
},
updateByQuery: { body: { script } = {} },
},
} = await testClaimAvailableTasks({
opts: {
@ -710,9 +722,7 @@ if (doc['task.runAt'].size()!=0) {
const {
result: { docs },
args: {
search: {
body: { query },
},
search: { body: { query } = {} },
},
} = await testClaimAvailableTasks({
opts: {
@ -725,7 +735,7 @@ if (doc['task.runAt'].size()!=0) {
hits: tasks,
});
expect(query.bool.must).toContainEqual({
expect(query?.bool?.must).toContainEqual({
bool: {
must: [
{
@ -804,11 +814,9 @@ if (doc['task.runAt'].size()!=0) {
},
];
const {
result: { docs },
result: { docs } = {},
args: {
search: {
body: { query },
},
search: { body: { query } = {} },
},
} = await testClaimAvailableTasks({
opts: {
@ -821,7 +829,7 @@ if (doc['task.runAt'].size()!=0) {
hits: tasks,
});
expect(query.bool.must).toContainEqual({
expect(query?.bool?.must).toContainEqual({
bool: {
must: [
{
@ -900,11 +908,9 @@ if (doc['task.runAt'].size()!=0) {
},
];
const {
result: { docs },
result: { docs } = {},
args: {
search: {
body: { query },
},
search: { body: { query } = {} },
},
} = await testClaimAvailableTasks({
opts: {
@ -917,7 +923,7 @@ if (doc['task.runAt'].size()!=0) {
hits: tasks,
});
expect(query.bool.must).toContainEqual({
expect(query?.bool?.must).toContainEqual({
bool: {
must: [
{
@ -961,19 +967,19 @@ if (doc['task.runAt'].size()!=0) {
});
test('pushes error from saved objects client to errors$', async () => {
const callCluster = jest.fn();
const esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
callCluster,
esClient,
definitions: taskDefinitions,
maxAttempts: 2,
savedObjectsRepository: savedObjectsClient,
});
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
callCluster.mockRejectedValue(new Error('Failure'));
esClient.updateByQuery.mockRejectedValue(new Error('Failure'));
await expect(
store.claimAvailableTasks({
claimOwnershipUntil: new Date(),
@ -986,13 +992,15 @@ if (doc['task.runAt'].size()!=0) {
describe('update', () => {
let store: TaskStore;
let esClient: ReturnType<typeof elasticsearchServiceMock.createClusterClient>['asInternalUser'];
beforeAll(() => {
esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
callCluster: jest.fn(),
esClient,
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
@ -1092,7 +1100,7 @@ if (doc['task.runAt'].size()!=0) {
index: 'tasky',
taskManagerId: '',
serializer,
callCluster: jest.fn(),
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
@ -1132,7 +1140,7 @@ if (doc['task.runAt'].size()!=0) {
index: 'tasky',
taskManagerId: '',
serializer,
callCluster: jest.fn(),
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
@ -1140,17 +1148,18 @@ if (doc['task.runAt'].size()!=0) {
});
test('removes the task with the specified id', async () => {
const id = `id-${_.random(1, 20)}`;
const id = randomId();
const result = await store.remove(id);
expect(result).toBeUndefined();
expect(savedObjectsClient.delete).toHaveBeenCalledWith('task', id);
});
test('pushes error from saved objects client to errors$', async () => {
const id = `id-${_.random(1, 20)}`;
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
savedObjectsClient.delete.mockRejectedValue(new Error('Failure'));
await expect(store.remove(id)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
await expect(store.remove(randomId())).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failure"`
);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});
});
@ -1163,7 +1172,7 @@ if (doc['task.runAt'].size()!=0) {
index: 'tasky',
taskManagerId: '',
serializer,
callCluster: jest.fn(),
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
@ -1171,13 +1180,12 @@ if (doc['task.runAt'].size()!=0) {
});
test('gets the task with the specified id', async () => {
const id = `id-${_.random(1, 20)}`;
const task = {
runAt: mockedDate,
scheduledAt: mockedDate,
startedAt: null,
retryAt: null,
id,
id: randomId(),
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
@ -1198,18 +1206,17 @@ if (doc['task.runAt'].size()!=0) {
version: '123',
}));
const result = await store.get(id);
const result = await store.get(task.id);
expect(result).toEqual(task);
expect(savedObjectsClient.get).toHaveBeenCalledWith('task', id);
expect(savedObjectsClient.get).toHaveBeenCalledWith('task', task.id);
});
test('pushes error from saved objects client to errors$', async () => {
const id = `id-${_.random(1, 20)}`;
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
savedObjectsClient.get.mockRejectedValue(new Error('Failure'));
await expect(store.get(id)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
await expect(store.get(randomId())).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});
});
@ -1219,13 +1226,12 @@ if (doc['task.runAt'].size()!=0) {
expect.assertions(4);
return Promise.all(
Object.values(TaskStatus).map(async (status) => {
const id = `id-${_.random(1, 20)}`;
const task = {
runAt: mockedDate,
scheduledAt: mockedDate,
startedAt: null,
retryAt: null,
id,
id: randomId(),
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
@ -1235,7 +1241,6 @@ if (doc['task.runAt'].size()!=0) {
ownerId: null,
};
const callCluster = jest.fn();
savedObjectsClient.get.mockImplementation(async (type: string, objectId: string) => ({
id: objectId,
type,
@ -1251,20 +1256,18 @@ if (doc['task.runAt'].size()!=0) {
index: 'tasky',
taskManagerId: '',
serializer,
callCluster,
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
expect(await store.getLifecycle(id)).toEqual(status);
expect(await store.getLifecycle(task.id)).toEqual(status);
})
);
});
test('returns NotFound status if the task doesnt exists ', async () => {
const id = `id-${_.random(1, 20)}`;
savedObjectsClient.get.mockRejectedValueOnce(
SavedObjectsErrorHelpers.createGenericNotFoundError('type', 'id')
);
@ -1273,18 +1276,16 @@ if (doc['task.runAt'].size()!=0) {
index: 'tasky',
taskManagerId: '',
serializer,
callCluster: jest.fn(),
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
expect(await store.getLifecycle(id)).toEqual(TaskLifecycleResult.NotFound);
expect(await store.getLifecycle(randomId())).toEqual(TaskLifecycleResult.NotFound);
});
test('throws if an unknown error takes place ', async () => {
const id = `id-${_.random(1, 20)}`;
savedObjectsClient.get.mockRejectedValueOnce(
SavedObjectsErrorHelpers.createBadRequestError()
);
@ -1293,13 +1294,13 @@ if (doc['task.runAt'].size()!=0) {
index: 'tasky',
taskManagerId: '',
serializer,
callCluster: jest.fn(),
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
return expect(store.getLifecycle(id)).rejects.toThrow('Bad Request');
return expect(store.getLifecycle(randomId())).rejects.toThrow('Bad Request');
});
});
@ -1385,18 +1386,20 @@ if (doc['task.runAt'].size()!=0) {
return { taskManagerId, runAt, tasks };
}
test('emits an event when a task is succesfully claimed by id', async () => {
function instantiateStoreWithMockedApiResponses() {
const { taskManagerId, runAt, tasks } = generateTasks();
const callCluster = sinon.spy(async (name: string, params?: unknown) =>
name === 'updateByQuery'
? {
total: tasks.length,
updated: tasks.length,
}
: { hits: { hits: tasks } }
const esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
esClient.search.mockResolvedValue(asApiResponse({ hits: { hits: tasks } }));
esClient.updateByQuery.mockResolvedValue(
asApiResponse({
total: tasks.length,
updated: tasks.length,
})
);
const store = new TaskStore({
callCluster,
esClient,
maxAttempts: 2,
definitions: taskDefinitions,
serializer,
@ -1405,6 +1408,12 @@ if (doc['task.runAt'].size()!=0) {
index: '',
});
return { taskManagerId, runAt, store };
}
test('emits an event when a task is succesfully claimed by id', async () => {
const { taskManagerId, runAt, store } = instantiateStoreWithMockedApiResponses();
const promise = store.events
.pipe(
filter(
@ -1446,24 +1455,7 @@ if (doc['task.runAt'].size()!=0) {
});
test('emits an event when a task is succesfully by scheduling', async () => {
const { taskManagerId, runAt, tasks } = generateTasks();
const callCluster = sinon.spy(async (name: string, params?: unknown) =>
name === 'updateByQuery'
? {
total: tasks.length,
updated: tasks.length,
}
: { hits: { hits: tasks } }
);
const store = new TaskStore({
callCluster,
maxAttempts: 2,
definitions: taskDefinitions,
serializer,
savedObjectsRepository: savedObjectsClient,
taskManagerId,
index: '',
});
const { taskManagerId, runAt, store } = instantiateStoreWithMockedApiResponses();
const promise = store.events
.pipe(
@ -1506,24 +1498,7 @@ if (doc['task.runAt'].size()!=0) {
});
test('emits an event when the store fails to claim a required task by id', async () => {
const { taskManagerId, runAt, tasks } = generateTasks();
const callCluster = sinon.spy(async (name: string, params?: unknown) =>
name === 'updateByQuery'
? {
total: tasks.length,
updated: tasks.length,
}
: { hits: { hits: tasks } }
);
const store = new TaskStore({
callCluster,
maxAttempts: 2,
definitions: taskDefinitions,
serializer,
savedObjectsRepository: savedObjectsClient,
taskManagerId,
index: '',
});
const { taskManagerId, runAt, store } = instantiateStoreWithMockedApiResponses();
const promise = store.events
.pipe(
@ -1568,24 +1543,7 @@ if (doc['task.runAt'].size()!=0) {
});
test('emits an event when the store fails to find a task which was required by id', async () => {
const { taskManagerId, tasks } = generateTasks();
const callCluster = sinon.spy(async (name: string, params?: unknown) =>
name === 'updateByQuery'
? {
total: tasks.length,
updated: tasks.length,
}
: { hits: { hits: tasks } }
);
const store = new TaskStore({
callCluster,
maxAttempts: 2,
definitions: taskDefinitions,
serializer,
savedObjectsRepository: savedObjectsClient,
taskManagerId,
index: '',
});
const { store } = instantiateStoreWithMockedApiResponses();
const promise = store.events
.pipe(
@ -1621,3 +1579,10 @@ function generateFakeTasks(count: number = 1) {
sort: ['a', _.random(1, 5)],
}));
}
const asApiResponse = <T>(body: T): RequestEvent<T> =>
({
body,
} as RequestEvent<T>);
const randomId = () => `id-${_.random(1, 20)}`;

View file

@ -20,15 +20,13 @@ import {
SavedObjectsRawDoc,
ISavedObjectsRepository,
SavedObjectsUpdateResponse,
ElasticsearchClient,
} from '../../../../src/core/server';
import { asOk, asErr, Result } from './lib/result_type';
import {
ConcreteTaskInstance,
ElasticJs,
TaskDefinition,
TaskDictionary,
TaskInstance,
TaskLifecycle,
TaskLifecycleResult,
@ -60,13 +58,14 @@ import {
SortByRunAtAndRetryAt,
tasksClaimedByOwner,
} from './queries/mark_available_tasks_as_claimed';
import { TaskTypeDictionary } from './task_type_dictionary';
export interface StoreOpts {
callCluster: ElasticJs;
esClient: ElasticsearchClient;
index: string;
taskManagerId: string;
maxAttempts: number;
definitions: TaskDictionary<TaskDefinition>;
definitions: TaskTypeDictionary;
savedObjectsRepository: ISavedObjectsRepository;
serializer: SavedObjectsSerializer;
}
@ -123,8 +122,8 @@ export class TaskStore {
public readonly taskManagerId: string;
public readonly errors$ = new Subject<Error>();
private callCluster: ElasticJs;
private definitions: TaskDictionary<TaskDefinition>;
private esClient: ElasticsearchClient;
private definitions: TaskTypeDictionary;
private savedObjectsRepository: ISavedObjectsRepository;
private serializer: SavedObjectsSerializer;
private events$: Subject<TaskClaim>;
@ -132,7 +131,7 @@ export class TaskStore {
/**
* Constructs a new TaskStore.
* @param {StoreOpts} opts
* @prop {CallCluster} callCluster - The elastic search connection
* @prop {esClient} esClient - An elasticsearch client
* @prop {string} index - The name of the task manager index
* @prop {number} maxAttempts - The maximum number of attempts before a task will be abandoned
* @prop {TaskDefinition} definition - The definition of the task being run
@ -140,7 +139,7 @@ export class TaskStore {
* @prop {savedObjectsRepository} - An instance to the saved objects repository
*/
constructor(opts: StoreOpts) {
this.callCluster = opts.callCluster;
this.esClient = opts.esClient;
this.index = opts.index;
this.taskManagerId = opts.taskManagerId;
this.maxAttempts = opts.maxAttempts;
@ -164,13 +163,7 @@ export class TaskStore {
* @param task - The task being scheduled.
*/
public async schedule(taskInstance: TaskInstance): Promise<ConcreteTaskInstance> {
if (!this.definitions[taskInstance.taskType]) {
throw new Error(
`Unsupported task type "${taskInstance.taskType}". Supported types are ${Object.keys(
this.definitions
).join(', ')}`
);
}
this.definitions.ensureHas(taskInstance.taskType);
let savedObject;
try {
@ -265,6 +258,9 @@ export class TaskStore {
claimTasksById: OwnershipClaimingOpts['claimTasksById'],
size: OwnershipClaimingOpts['size']
): Promise<number> {
const tasksWithRemainingAttempts = [...this.definitions].map(([type, { maxAttempts }]) =>
taskWithLessThanMaxAttempts(type, maxAttempts || this.maxAttempts)
);
const queryForScheduledTasks = mustBeAllOf(
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
@ -272,9 +268,7 @@ export class TaskStore {
// Either task has a schedule or the attempts < the maximum configured
shouldBeOneOf<ExistsFilter | TermFilter | RangeFilter>(
TaskWithSchedule,
...Object.entries(this.definitions).map(([type, { maxAttempts }]) =>
taskWithLessThanMaxAttempts(type, maxAttempts || this.maxAttempts)
)
...tasksWithRemainingAttempts
)
);
@ -473,30 +467,31 @@ export class TaskStore {
private async search(opts: SearchOpts = {}): Promise<FetchResult> {
const { query } = ensureQueryOnlyReturnsTaskObjects(opts);
let result;
try {
result = await this.callCluster('search', {
const {
body: {
hits: { hits: tasks },
},
} = await this.esClient.search<SearchResponse<SavedObjectsRawDoc['_source']>>({
index: this.index,
ignoreUnavailable: true,
ignore_unavailable: true,
body: {
...opts,
query,
},
});
return {
docs: tasks
.filter((doc) => this.serializer.isRawSavedObject(doc))
.map((doc) => this.serializer.rawToSavedObject(doc))
.map((doc) => omit(doc, 'namespace') as SavedObject<SerializedConcreteTaskInstance>)
.map(savedObjectToConcreteTaskInstance),
};
} catch (e) {
this.errors$.next(e);
throw e;
}
const rawDocs = (result as SearchResponse<unknown>).hits.hits;
return {
docs: (rawDocs as SavedObjectsRawDoc[])
.filter((doc) => this.serializer.isRawSavedObject(doc))
.map((doc) => this.serializer.rawToSavedObject(doc))
.map((doc) => omit(doc, 'namespace') as SavedObject<SerializedConcreteTaskInstance>)
.map(savedObjectToConcreteTaskInstance),
};
}
private async updateByQuery(
@ -505,11 +500,13 @@ export class TaskStore {
{ max_docs }: UpdateByQueryOpts = {}
): Promise<UpdateByQueryResult> {
const { query } = ensureQueryOnlyReturnsTaskObjects(opts);
let result;
try {
result = await this.callCluster('updateByQuery', {
const {
// eslint-disable-next-line @typescript-eslint/naming-convention
body: { total, updated, version_conflicts },
} = await this.esClient.updateByQuery<UpdateDocumentByQueryResponse>({
index: this.index,
ignoreUnavailable: true,
ignore_unavailable: true,
refresh: true,
max_docs,
conflicts: 'proceed',
@ -518,18 +515,16 @@ export class TaskStore {
query,
},
});
return {
total,
updated,
version_conflicts,
};
} catch (e) {
this.errors$.next(e);
throw e;
}
// eslint-disable-next-line @typescript-eslint/naming-convention
const { total, updated, version_conflicts } = result as UpdateDocumentByQueryResponse;
return {
total,
updated,
version_conflicts,
};
}
}

View file

@ -5,8 +5,8 @@
*/
import { get } from 'lodash';
import { RunContext, TaskDictionary, TaskDefinition } from '../task';
import { sanitizeTaskDefinitions } from './sanitize_task_definitions';
import { RunContext, TaskDefinition } from './task';
import { sanitizeTaskDefinitions } from './task_type_dictionary';
interface Opts {
numTasks: number;
@ -35,39 +35,40 @@ const getMockTaskDefinitions = (opts: Opts) => {
},
};
}
return (tasks as unknown) as TaskDictionary<TaskDefinition>;
return (tasks as unknown) as Record<string, TaskDefinition>;
};
describe('sanitizeTaskDefinitions', () => {
describe('taskTypeDictionary', () => {
describe('sanitizeTaskDefinitions', () => {});
it('provides tasks with defaults', () => {
const taskDefinitions = getMockTaskDefinitions({ numTasks: 3 });
const result = sanitizeTaskDefinitions(taskDefinitions);
expect(result).toMatchInlineSnapshot(`
Object {
"test_task_type_0": Object {
"createTaskRunner": [Function],
"description": "one super cool task",
"timeout": "5m",
"title": "Test",
"type": "test_task_type_0",
},
"test_task_type_1": Object {
"createTaskRunner": [Function],
"description": "one super cool task",
"timeout": "5m",
"title": "Test",
"type": "test_task_type_1",
},
"test_task_type_2": Object {
"createTaskRunner": [Function],
"description": "one super cool task",
"timeout": "5m",
"title": "Test",
"type": "test_task_type_2",
},
}
`);
Array [
Object {
"createTaskRunner": [Function],
"description": "one super cool task",
"timeout": "5m",
"title": "Test",
"type": "test_task_type_0",
},
Object {
"createTaskRunner": [Function],
"description": "one super cool task",
"timeout": "5m",
"title": "Test",
"type": "test_task_type_1",
},
Object {
"createTaskRunner": [Function],
"description": "one super cool task",
"timeout": "5m",
"title": "Test",
"type": "test_task_type_2",
},
]
`);
});
it('throws a validation exception for invalid task definition', () => {

View file

@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import Joi from 'joi';
import { TaskDefinition, validateTaskDefinition } from './task';
import { Logger } from '../../../../src/core/server';
/*
* The TaskManager is the public interface into the task manager system. This glues together
* all of the disparate modules in one integration point. The task manager operates in two different ways:
*
* - pre-init, it allows middleware registration, but disallows task manipulation
* - post-init, it disallows middleware registration, but allows task manipulation
*
* Due to its complexity, this is mostly tested by integration tests (see readme).
*/
/**
* The public interface into the task manager system.
*/
export class TaskTypeDictionary {
private definitions = new Map<string, TaskDefinition>();
private logger: Logger;
constructor(logger: Logger) {
this.logger = logger;
}
[Symbol.iterator]() {
return this.definitions.entries();
}
public has(type: string) {
return this.definitions.has(type);
}
public get(type: string): TaskDefinition {
this.ensureHas(type);
return this.definitions.get(type)!;
}
public ensureHas(type: string) {
if (!this.has(type)) {
throw new Error(
`Unsupported task type "${type}". Supported types are ${[...this.definitions.keys()].join(
', '
)}`
);
}
}
/**
* Method for allowing consumers to register task definitions into the system.
* @param taskDefinitions - The Kibana task definitions dictionary
*/
public registerTaskDefinitions(taskDefinitions: Record<string, Omit<TaskDefinition, 'type'>>) {
const duplicate = Object.keys(taskDefinitions).find((type) => this.definitions.has(type));
if (duplicate) {
throw new Error(`Task ${duplicate} is already defined!`);
}
try {
for (const definition of sanitizeTaskDefinitions(taskDefinitions)) {
this.definitions.set(definition.type, definition);
}
} catch (e) {
this.logger.error('Could not sanitize task definitions');
}
}
}
/**
* Sanitizes the system's task definitions. Task definitions have optional properties, and
* this ensures they all are given a reasonable default.
*
* @param taskDefinitions - The Kibana task definitions dictionary
*/
export function sanitizeTaskDefinitions(
taskDefinitions: Record<string, Omit<TaskDefinition, 'type'>>
): TaskDefinition[] {
return Object.entries(taskDefinitions).map(([type, rawDefinition]) =>
Joi.attempt<TaskDefinition>({ type, ...rawDefinition }, validateTaskDefinition)
);
}

View file

@ -3,6 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { loggingSystemMock } from 'src/core/server/mocks';
/*
* A handful of helper functions for testing the task manager.
@ -11,18 +12,9 @@
// Caching this here to avoid setTimeout mocking affecting our tests.
const nativeTimeout = setTimeout;
/**
* Creates a mock task manager Logger.
*/
export function mockLogger() {
return {
info: jest.fn(),
debug: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
};
return loggingSystemMock.createLogger();
}
export interface Resolvable {
resolve: () => void;
}

View file

@ -1,16 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { TaskManager as TaskManagerClass } from './task_manager';
export type TaskManager = PublicMethodsOf<TaskManagerClass>;
export interface Logger {
info(message: string): void;
debug(message: string): void;
warn(message: string): void;
error(message: string): void;
}

View file

@ -92,13 +92,11 @@ export class SampleTaskManagerFixturePlugin
taskManager.registerTaskDefinitions({
sampleTask: {
...defaultSampleTaskConfig,
type: 'sampleTask',
title: 'Sample Task',
description: 'A sample task for testing the task_manager.',
},
singleAttemptSampleTask: {
...defaultSampleTaskConfig,
type: 'singleAttemptSampleTask',
title: 'Failing Sample Task',
description:
'A sample task for testing the task_manager that fails on the first attempt to run.',

View file

@ -79,7 +79,6 @@ export class SampleTaskManagerFixturePlugin
taskManager.registerTaskDefinitions({
performanceTestTask: {
type: 'performanceTestTask',
title,
description: 'A task for stress testing task_manager.',
timeout: '1m',