migrate TaskManager Plugin to the Kibana Platform (#53869)

Migrates the existing TaskManager plugin from Legacy to Kibana Platform.
We retain the Legacy API to prevent a breaking change, but under the hood, the legacy plugin is now using the Kibana Platform plugin.

Another reason we retain the Legacy plugin to support several features that the Platform team has yet to migrate to Kibana Platform (mapping, SO schema and migrations).
This commit is contained in:
Gidi Meir Morris 2020-01-13 19:09:57 +00:00 committed by GitHub
parent ec69443ca2
commit ea9a7b8a16
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
95 changed files with 999 additions and 612 deletions

View file

@ -37,6 +37,7 @@ export { elasticsearchServiceMock } from './elasticsearch/elasticsearch_service.
export { httpServiceMock } from './http/http_service.mock';
export { loggingServiceMock } from './logging/logging_service.mock';
export { savedObjectsClientMock } from './saved_objects/service/saved_objects_client.mock';
export { savedObjectsRepositoryMock } from './saved_objects/service/lib/repository.mock';
export { uiSettingsServiceMock } from './ui_settings/ui_settings_service.mock';
import { uuidServiceMock } from './uuid/uuid_service.mock';

View file

@ -4,13 +4,13 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { taskManagerMock } from '../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../plugins/task_manager/server/task_manager.mock';
import { ActionTypeRegistry } from './action_type_registry';
import { ExecutorType } from './types';
import { ActionExecutor, ExecutorError, TaskRunnerFactory } from './lib';
import { configUtilsMock } from './actions_config.mock';
const mockTaskManager = taskManagerMock.create();
const mockTaskManager = taskManagerMock.setup();
const actionTypeRegistryParams = {
taskManager: mockTaskManager,
taskRunnerFactory: new TaskRunnerFactory(new ActionExecutor()),

View file

@ -6,11 +6,11 @@
import Boom from 'boom';
import { i18n } from '@kbn/i18n';
import { TaskManagerSetupContract } from './shim';
import { RunContext } from '../../task_manager/server';
import { RunContext, TaskManagerSetupContract } from '../../../../plugins/task_manager/server';
import { ExecutorError, TaskRunnerFactory } from './lib';
import { ActionType } from './types';
import { ActionsConfigurationUtilities } from './actions_config';
interface ConstructorOptions {
taskManager: TaskManagerSetupContract;
taskRunnerFactory: TaskRunnerFactory;

View file

@ -10,7 +10,7 @@ import { ActionTypeRegistry } from './action_type_registry';
import { ActionsClient } from './actions_client';
import { ExecutorType } from './types';
import { ActionExecutor, TaskRunnerFactory } from './lib';
import { taskManagerMock } from '../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../plugins/task_manager/server/task_manager.mock';
import { configUtilsMock } from './actions_config.mock';
import { getActionsConfigurationUtilities } from './actions_config';
@ -23,7 +23,7 @@ const defaultKibanaIndex = '.kibana';
const savedObjectsClient = savedObjectsClientMock.create();
const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient();
const mockTaskManager = taskManagerMock.create();
const mockTaskManager = taskManagerMock.setup();
const actionTypeRegistryParams = {
taskManager: mockTaskManager,

View file

@ -6,7 +6,7 @@
import { ActionExecutor, TaskRunnerFactory } from '../lib';
import { ActionTypeRegistry } from '../action_type_registry';
import { taskManagerMock } from '../../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../../plugins/task_manager/server/task_manager.mock';
import { registerBuiltInActionTypes } from './index';
import { Logger } from '../../../../../../src/core/server';
import { loggingServiceMock } from '../../../../../../src/core/server/mocks';
@ -20,7 +20,7 @@ export function createActionTypeRegistry(): {
} {
const logger = loggingServiceMock.create().get() as jest.Mocked<Logger>;
const actionTypeRegistry = new ActionTypeRegistry({
taskManager: taskManagerMock.create(),
taskManager: taskManagerMock.setup(),
taskRunnerFactory: new TaskRunnerFactory(new ActionExecutor()),
actionsConfigUtils: configUtilsMock,
});

View file

@ -4,11 +4,11 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { taskManagerMock } from '../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../plugins/task_manager/server/task_manager.mock';
import { createExecuteFunction } from './create_execute_function';
import { savedObjectsClientMock } from '../../../../../src/core/server/mocks';
const mockTaskManager = taskManagerMock.create();
const mockTaskManager = taskManagerMock.start();
const savedObjectsClient = savedObjectsClientMock.create();
const getBasePath = jest.fn();

View file

@ -5,7 +5,7 @@
*/
import { SavedObjectsClientContract } from 'src/core/server';
import { TaskManagerStartContract } from './shim';
import { TaskManagerStartContract } from '../../../../plugins/task_manager/server';
import { GetBasePathFunction } from './types';
interface CreateExecuteFunctionOptions {

View file

@ -4,11 +4,12 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { Legacy } from 'kibana';
import { Plugin } from './plugin';
import { shim, Server } from './shim';
import { shim } from './shim';
import { ActionsPlugin } from './types';
export async function init(server: Server) {
export async function init(server: Legacy.Server) {
const { initializerContext, coreSetup, coreStart, pluginsSetup, pluginsStart } = shim(server);
const plugin = new Plugin(initializerContext);

View file

@ -7,7 +7,7 @@
import sinon from 'sinon';
import { ExecutorError } from './executor_error';
import { ActionExecutor } from './action_executor';
import { ConcreteTaskInstance, TaskStatus } from '../../../task_manager/server';
import { ConcreteTaskInstance, TaskStatus } from '../../../../../plugins/task_manager/server';
import { TaskRunnerFactory } from './task_runner_factory';
import { actionTypeRegistryMock } from '../action_type_registry.mock';
import { actionExecutorMock } from './action_executor.mock';

View file

@ -6,7 +6,7 @@
import { ActionExecutorContract } from './action_executor';
import { ExecutorError } from './executor_error';
import { RunContext } from '../../../task_manager/server';
import { RunContext } from '../../../../../plugins/task_manager/server';
import { PluginStartContract as EncryptedSavedObjectsStartContract } from '../../../../../plugins/encrypted_saved_objects/server';
import { ActionTaskParams, GetBasePathFunction, SpaceIdToNamespaceFunction } from '../types';

View file

@ -93,7 +93,7 @@ export class Plugin {
const actionsConfigUtils = getActionsConfigurationUtilities(config as ActionsConfigType);
const actionTypeRegistry = new ActionTypeRegistry({
taskRunnerFactory,
taskManager: plugins.task_manager,
taskManager: plugins.taskManager,
actionsConfigUtils,
});
this.taskRunnerFactory = taskRunnerFactory;
@ -164,7 +164,7 @@ export class Plugin {
});
const executeFn = createExecuteFunction({
taskManager: plugins.task_manager,
taskManager: plugins.taskManager,
getScopedSavedObjectsClient: core.savedObjects.getScopedSavedObjectsClient,
getBasePath,
});

View file

@ -8,7 +8,11 @@ import Hapi from 'hapi';
import { Legacy } from 'kibana';
import * as Rx from 'rxjs';
import { ActionsConfigType } from './types';
import { TaskManager } from '../../task_manager/server';
import {
TaskManagerStartContract,
TaskManagerSetupContract,
} from '../../../../plugins/task_manager/server';
import { getTaskManagerSetup, getTaskManagerStart } from '../../task_manager/server';
import { XPackMainPlugin } from '../../xpack_main/server/xpack_main';
import KbnServer from '../../../../../src/legacy/server/kbn_server';
import { LegacySpacesPlugin as SpacesPluginStartContract } from '../../spaces';
@ -24,16 +28,6 @@ import {
} from '../../../../../src/core/server';
import { LicensingPluginSetup } from '../../../../plugins/licensing/server';
// Extend PluginProperties to indicate which plugins are guaranteed to exist
// due to being marked as dependencies
interface Plugins extends Hapi.PluginProperties {
task_manager: TaskManager;
}
export interface Server extends Legacy.Server {
plugins: Plugins;
}
export interface KibanaConfig {
index: string;
}
@ -41,14 +35,9 @@ export interface KibanaConfig {
/**
* Shim what we're thinking setup and start contracts will look like
*/
export type TaskManagerStartContract = Pick<TaskManager, 'schedule' | 'fetch' | 'remove'>;
export type XPackMainPluginSetupContract = Pick<XPackMainPlugin, 'registerFeature'>;
export type SecurityPluginSetupContract = Pick<SecurityPlugin, '__legacyCompat'>;
export type SecurityPluginStartContract = Pick<SecurityPlugin, 'authc'>;
export type TaskManagerSetupContract = Pick<
TaskManager,
'addMiddleware' | 'registerTaskDefinitions'
>;
/**
* New platform interfaces
@ -74,7 +63,7 @@ export interface ActionsCoreStart {
}
export interface ActionsPluginsSetup {
security?: SecurityPluginSetupContract;
task_manager: TaskManagerSetupContract;
taskManager: TaskManagerSetupContract;
xpack_main: XPackMainPluginSetupContract;
encryptedSavedObjects: EncryptedSavedObjectsSetupContract;
licensing: LicensingPluginSetup;
@ -83,7 +72,7 @@ export interface ActionsPluginsStart {
security?: SecurityPluginStartContract;
spaces: () => SpacesPluginStartContract | undefined;
encryptedSavedObjects: EncryptedSavedObjectsStartContract;
task_manager: TaskManagerStartContract;
taskManager: TaskManagerStartContract;
}
/**
@ -92,7 +81,7 @@ export interface ActionsPluginsStart {
* @param server Hapi server instance
*/
export function shim(
server: Server
server: Legacy.Server
): {
initializerContext: ActionsPluginInitializerContext;
coreSetup: ActionsCoreSetup;
@ -132,7 +121,7 @@ export function shim(
const pluginsSetup: ActionsPluginsSetup = {
security: newPlatform.setup.plugins.security as SecurityPluginSetupContract | undefined,
task_manager: server.plugins.task_manager,
taskManager: getTaskManagerSetup(server)!,
xpack_main: server.plugins.xpack_main,
encryptedSavedObjects: newPlatform.setup.plugins
.encryptedSavedObjects as EncryptedSavedObjectsSetupContract,
@ -146,7 +135,7 @@ export function shim(
spaces: () => server.plugins.spaces,
encryptedSavedObjects: newPlatform.start.plugins
.encryptedSavedObjects as EncryptedSavedObjectsStartContract,
task_manager: server.plugins.task_manager,
taskManager: getTaskManagerStart(server)!,
};
return {

View file

@ -6,10 +6,9 @@
import { TaskRunnerFactory } from './task_runner';
import { AlertTypeRegistry } from './alert_type_registry';
import { taskManagerMock } from '../../task_manager/server/task_manager.mock';
const taskManager = taskManagerMock.create();
import { taskManagerMock } from '../../../../plugins/task_manager/server/task_manager.mock';
const taskManager = taskManagerMock.setup();
const alertTypeRegistryParams = {
taskManager,
taskRunnerFactory: new TaskRunnerFactory(),

View file

@ -6,9 +6,8 @@
import Boom from 'boom';
import { i18n } from '@kbn/i18n';
import { RunContext, TaskManagerSetupContract } from '../../../../plugins/task_manager/server';
import { TaskRunnerFactory } from './task_runner';
import { RunContext } from '../../task_manager';
import { TaskManagerSetupContract } from './shim';
import { AlertType } from './types';
interface ConstructorOptions {

View file

@ -7,14 +7,14 @@ import uuid from 'uuid';
import { schema } from '@kbn/config-schema';
import { AlertsClient } from './alerts_client';
import { savedObjectsClientMock, loggingServiceMock } from '../../../../../src/core/server/mocks';
import { taskManagerMock } from '../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../plugins/task_manager/server/task_manager.mock';
import { alertTypeRegistryMock } from './alert_type_registry.mock';
import { TaskStatus } from '../../task_manager/server';
import { TaskStatus } from '../../../../plugins/task_manager/server';
import { IntervalSchedule } from './types';
import { resolvable } from './test_utils';
import { encryptedSavedObjectsMock } from '../../../../plugins/encrypted_saved_objects/server/mocks';
const taskManager = taskManagerMock.create();
const taskManager = taskManagerMock.start();
const alertTypeRegistry = alertTypeRegistryMock.create();
const savedObjectsClient = savedObjectsClientMock.create();
const encryptedSavedObjects = encryptedSavedObjectsMock.createStart();

View file

@ -22,7 +22,6 @@ import {
AlertType,
IntervalSchedule,
} from './types';
import { TaskManagerStartContract } from './shim';
import { validateAlertTypeParams } from './lib';
import {
InvalidateAPIKeyParams,
@ -30,6 +29,7 @@ import {
InvalidateAPIKeyResult as SecurityPluginInvalidateAPIKeyResult,
} from '../../../../plugins/security/server';
import { PluginStartContract as EncryptedSavedObjectsStartContract } from '../../../../plugins/encrypted_saved_objects/server';
import { TaskManagerStartContract } from '../../../../plugins/task_manager/server';
type NormalizedAlertAction = Omit<AlertAction, 'actionTypeId'>;
export type CreateAPIKeyResult =

View file

@ -7,7 +7,7 @@
import { Request } from 'hapi';
import { AlertsClientFactory, ConstructorOpts } from './alerts_client_factory';
import { alertTypeRegistryMock } from './alert_type_registry.mock';
import { taskManagerMock } from '../../task_manager/server/task_manager.mock';
import { taskManagerMock } from '../../../../plugins/task_manager/server/task_manager.mock';
import { KibanaRequest } from '../../../../../src/core/server';
import { loggingServiceMock } from '../../../../../src/core/server/mocks';
import { encryptedSavedObjectsMock } from '../../../../plugins/encrypted_saved_objects/server/mocks';
@ -23,7 +23,7 @@ const securityPluginSetup = {
};
const alertsClientFactoryParams: jest.Mocked<ConstructorOpts> = {
logger: loggingServiceMock.create().get(),
taskManager: taskManagerMock.create(),
taskManager: taskManagerMock.start(),
alertTypeRegistry: alertTypeRegistryMock.create(),
getSpaceId: jest.fn(),
spaceIdToNamespace: jest.fn(),

View file

@ -8,10 +8,11 @@ import Hapi from 'hapi';
import uuid from 'uuid';
import { AlertsClient } from './alerts_client';
import { AlertTypeRegistry, SpaceIdToNamespaceFunction } from './types';
import { SecurityPluginStartContract, TaskManagerStartContract } from './shim';
import { SecurityPluginStartContract } from './shim';
import { KibanaRequest, Logger } from '../../../../../src/core/server';
import { InvalidateAPIKeyParams } from '../../../../plugins/security/server';
import { PluginStartContract as EncryptedSavedObjectsStartContract } from '../../../../plugins/encrypted_saved_objects/server';
import { TaskManagerStartContract } from '../../../../plugins/task_manager/server';
export interface ConstructorOpts {
logger: Logger;

View file

@ -79,7 +79,7 @@ export class Plugin {
});
const alertTypeRegistry = new AlertTypeRegistry({
taskManager: plugins.task_manager,
taskManager: plugins.taskManager,
taskRunnerFactory: this.taskRunnerFactory,
});
this.alertTypeRegistry = alertTypeRegistry;
@ -116,7 +116,7 @@ export class Plugin {
const alertsClientFactory = new AlertsClientFactory({
alertTypeRegistry: this.alertTypeRegistry!,
logger: this.logger,
taskManager: plugins.task_manager,
taskManager: plugins.taskManager,
securityPluginSetup: plugins.security,
encryptedSavedObjectsPlugin: plugins.encryptedSavedObjects,
spaceIdToNamespace,

View file

@ -7,7 +7,11 @@
import Hapi from 'hapi';
import { Legacy } from 'kibana';
import { LegacySpacesPlugin as SpacesPluginStartContract } from '../../spaces';
import { TaskManager } from '../../task_manager/server';
import {
TaskManagerStartContract,
TaskManagerSetupContract,
} from '../../../../plugins/task_manager/server';
import { getTaskManagerSetup, getTaskManagerStart } from '../../task_manager/server';
import { XPackMainPlugin } from '../../xpack_main/server/xpack_main';
import KbnServer from '../../../../../src/legacy/server/kbn_server';
import {
@ -31,7 +35,6 @@ import { LicensingPluginSetup } from '../../../../plugins/licensing/server';
// due to being marked as dependencies
interface Plugins extends Hapi.PluginProperties {
actions: ActionsPlugin;
task_manager: TaskManager;
}
export interface Server extends Legacy.Server {
@ -41,17 +44,9 @@ export interface Server extends Legacy.Server {
/**
* Shim what we're thinking setup and start contracts will look like
*/
export type TaskManagerStartContract = Pick<
TaskManager,
'schedule' | 'fetch' | 'remove' | 'runNow'
>;
export type SecurityPluginSetupContract = Pick<SecurityPlugin, '__legacyCompat'>;
export type SecurityPluginStartContract = Pick<SecurityPlugin, 'authc'>;
export type XPackMainPluginSetupContract = Pick<XPackMainPlugin, 'registerFeature'>;
export type TaskManagerSetupContract = Pick<
TaskManager,
'addMiddleware' | 'registerTaskDefinitions'
>;
/**
* New platform interfaces
@ -73,7 +68,7 @@ export interface AlertingCoreStart {
}
export interface AlertingPluginsSetup {
security?: SecurityPluginSetupContract;
task_manager: TaskManagerSetupContract;
taskManager: TaskManagerSetupContract;
actions: ActionsPluginSetupContract;
xpack_main: XPackMainPluginSetupContract;
encryptedSavedObjects: EncryptedSavedObjectsSetupContract;
@ -84,7 +79,7 @@ export interface AlertingPluginsStart {
security?: SecurityPluginStartContract;
spaces: () => SpacesPluginStartContract | undefined;
encryptedSavedObjects: EncryptedSavedObjectsStartContract;
task_manager: TaskManagerStartContract;
taskManager: TaskManagerStartContract;
}
/**
@ -121,7 +116,7 @@ export function shim(
const pluginsSetup: AlertingPluginsSetup = {
security: newPlatform.setup.plugins.security as SecurityPluginSetupContract | undefined,
task_manager: server.plugins.task_manager,
taskManager: getTaskManagerSetup(server)!,
actions: server.plugins.actions.setup,
xpack_main: server.plugins.xpack_main,
encryptedSavedObjects: newPlatform.setup.plugins
@ -137,7 +132,7 @@ export function shim(
spaces: () => server.plugins.spaces,
encryptedSavedObjects: newPlatform.start.plugins
.encryptedSavedObjects as EncryptedSavedObjectsStartContract,
task_manager: server.plugins.task_manager,
taskManager: getTaskManagerStart(server)!,
};
return {

View file

@ -7,7 +7,7 @@
import sinon from 'sinon';
import { schema } from '@kbn/config-schema';
import { AlertExecutorOptions } from '../types';
import { ConcreteTaskInstance, TaskStatus } from '../../../task_manager';
import { ConcreteTaskInstance, TaskStatus } from '../../../../../plugins/task_manager/server';
import { TaskRunnerContext } from './task_runner_factory';
import { TaskRunner } from './task_runner';
import { encryptedSavedObjectsMock } from '../../../../../plugins/encrypted_saved_objects/server/mocks';

View file

@ -8,7 +8,7 @@ import { pick, mapValues, omit } from 'lodash';
import { Logger } from '../../../../../../src/core/server';
import { SavedObject } from '../../../../../../src/core/server';
import { TaskRunnerContext } from './task_runner_factory';
import { ConcreteTaskInstance } from '../../../task_manager';
import { ConcreteTaskInstance } from '../../../../../plugins/task_manager/server';
import { createExecutionHandler } from './create_execution_handler';
import { AlertInstance, createAlertInstanceFactory } from '../alert_instance';
import { getNextRunAt } from './get_next_run_at';

View file

@ -5,7 +5,7 @@
*/
import sinon from 'sinon';
import { ConcreteTaskInstance, TaskStatus } from '../../../task_manager';
import { ConcreteTaskInstance, TaskStatus } from '../../../../../plugins/task_manager/server';
import { TaskRunnerContext, TaskRunnerFactory } from './task_runner_factory';
import { encryptedSavedObjectsMock } from '../../../../../plugins/encrypted_saved_objects/server/mocks';
import {

View file

@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { Logger } from '../../../../../../src/core/server';
import { RunContext } from '../../../task_manager';
import { RunContext } from '../../../../../plugins/task_manager/server';
import { PluginStartContract as EncryptedSavedObjectsStartContract } from '../../../../../plugins/encrypted_saved_objects/server';
import { PluginStartContract as ActionsPluginStartContract } from '../../../actions';
import {

View file

@ -11,6 +11,7 @@ import KbnServer, { Server } from 'src/legacy/server/kbn_server';
import mappings from './mappings.json';
import { PLUGIN_ID, getEditPath, NOT_INTERNATIONALIZED_PRODUCT_NAME } from './common';
import { lensServerPlugin } from './server';
import { getTaskManagerSetup, getTaskManagerStart } from '../task_manager/server';
export const lens: LegacyPluginInitializer = kibana => {
return new kibana.Plugin({
@ -64,6 +65,12 @@ export const lens: LegacyPluginInitializer = kibana => {
savedObjects: server.savedObjects,
config: server.config(),
server,
taskManager: getTaskManagerSetup(server)!,
});
plugin.start(kbnServer.newPlatform.start.core, {
server,
taskManager: getTaskManagerStart(server)!,
});
server.events.on('stop', () => {

View file

@ -5,28 +5,51 @@
*/
import { Server, KibanaConfig } from 'src/legacy/server/kbn_server';
import { Plugin, CoreSetup, SavedObjectsLegacyService } from 'src/core/server';
import { Plugin, CoreSetup, CoreStart, SavedObjectsLegacyService } from 'src/core/server';
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import { Subject } from 'rxjs';
import { first } from 'rxjs/operators';
import {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '../../../../plugins/task_manager/server';
import { setupRoutes } from './routes';
import { registerLensUsageCollector, initializeLensTelemetry } from './usage';
import {
registerLensUsageCollector,
initializeLensTelemetry,
scheduleLensTelemetry,
} from './usage';
export interface PluginSetupContract {
savedObjects: SavedObjectsLegacyService;
usageCollection: UsageCollectionSetup;
config: KibanaConfig;
server: Server;
taskManager: TaskManagerSetupContract;
}
export interface PluginStartContract {
server: Server;
taskManager: TaskManagerStartContract;
}
const taskManagerStartContract$ = new Subject<TaskManagerStartContract>();
export class LensServer implements Plugin<{}, {}, {}, {}> {
setup(core: CoreSetup, plugins: PluginSetupContract) {
setupRoutes(core, plugins);
registerLensUsageCollector(plugins.usageCollection, plugins.server);
initializeLensTelemetry(core, plugins.server);
registerLensUsageCollector(
plugins.usageCollection,
taskManagerStartContract$.pipe(first()).toPromise()
);
initializeLensTelemetry(plugins.server, plugins.taskManager);
return {};
}
start() {
start(core: CoreStart, plugins: PluginStartContract) {
scheduleLensTelemetry(plugins.server, plugins.taskManager);
taskManagerStartContract$.next(plugins.taskManager);
taskManagerStartContract$.complete();
return {};
}

View file

@ -6,32 +6,25 @@
import moment from 'moment';
import { get } from 'lodash';
import { Server } from 'src/legacy/server/kbn_server';
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import { TaskManagerStartContract } from '../../../../../plugins/task_manager/server';
import { LensUsage, LensTelemetryState } from './types';
export function registerLensUsageCollector(usageCollection: UsageCollectionSetup, server: Server) {
export function registerLensUsageCollector(
usageCollection: UsageCollectionSetup,
taskManager: Promise<TaskManagerStartContract>
) {
let isCollectorReady = false;
async function determineIfTaskManagerIsReady() {
let isReady = false;
try {
isReady = await isTaskManagerReady(server);
} catch (err) {} // eslint-disable-line
if (isReady) {
isCollectorReady = true;
} else {
setTimeout(determineIfTaskManagerIsReady, 500);
}
}
determineIfTaskManagerIsReady();
taskManager.then(() => {
// mark lensUsageCollector as ready to collect when the TaskManager is ready
isCollectorReady = true;
});
const lensUsageCollector = usageCollection.makeUsageCollector({
type: 'lens',
fetch: async (): Promise<LensUsage> => {
try {
const docs = await getLatestTaskState(server);
const docs = await getLatestTaskState(await taskManager);
// get the accumulated state from the recurring task
const state: LensTelemetryState = get(docs, '[0].state');
@ -73,17 +66,7 @@ function addEvents(prevEvents: Record<string, number>, newEvents: Record<string,
});
}
async function isTaskManagerReady(server: Server) {
return (await getLatestTaskState(server)) !== null;
}
async function getLatestTaskState(server: Server) {
const taskManager = server.plugins.task_manager;
if (!taskManager) {
return null;
}
async function getLatestTaskState(taskManager: TaskManagerStartContract) {
try {
const result = await taskManager.fetch({
query: { bool: { filter: { term: { _id: `task:Lens-lens_telemetry` } } } },

View file

@ -6,7 +6,6 @@
import moment from 'moment';
import KbnServer, { Server } from 'src/legacy/server/kbn_server';
import { CoreSetup } from 'src/core/server';
import { CallClusterOptions } from 'src/legacy/core_plugins/elasticsearch';
import {
SearchParams,
@ -16,7 +15,12 @@ import {
} from 'elasticsearch';
import { ESSearchResponse } from '../../../apm/typings/elasticsearch';
import { XPackMainPlugin } from '../../../xpack_main/server/xpack_main';
import { RunContext } from '../../../task_manager/server';
import {
RunContext,
TaskManagerSetupContract,
TaskManagerStartContract,
} from '../../../../../plugins/task_manager/server';
import { getVisualizationCounts } from './visualization_counts';
// This task is responsible for running daily and aggregating all the Lens click event objects
@ -39,19 +43,21 @@ type ClusterDeleteType = (
options?: CallClusterOptions
) => Promise<DeleteDocumentByQueryResponse>;
export function initializeLensTelemetry(core: CoreSetup, server: Server) {
registerLensTelemetryTask(core, server);
scheduleTasks(server);
}
function registerLensTelemetryTask(core: CoreSetup, server: Server) {
const taskManager = server.plugins.task_manager;
export function initializeLensTelemetry(server: Server, taskManager?: TaskManagerSetupContract) {
if (!taskManager) {
server.log(['debug', 'telemetry'], `Task manager is not available`);
return;
} else {
registerLensTelemetryTask(server, taskManager);
}
}
export function scheduleLensTelemetry(server: Server, taskManager?: TaskManagerStartContract) {
if (taskManager) {
scheduleTasks(server, taskManager);
}
}
function registerLensTelemetryTask(server: Server, taskManager: TaskManagerSetupContract) {
taskManager.registerTaskDefinitions({
[TELEMETRY_TASK_TYPE]: {
title: 'Lens telemetry fetch task',
@ -62,17 +68,11 @@ function registerLensTelemetryTask(core: CoreSetup, server: Server) {
});
}
function scheduleTasks(server: Server) {
const taskManager = server.plugins.task_manager;
function scheduleTasks(server: Server, taskManager: TaskManagerStartContract) {
const { kbnServer } = (server.plugins.xpack_main as XPackMainPlugin & {
status: { plugin: { kbnServer: KbnServer } };
}).status.plugin;
if (!taskManager) {
server.log(['debug', 'telemetry'], `Task manager is not available`);
return;
}
kbnServer.afterPluginsInit(() => {
// The code block below can't await directly within "afterPluginsInit"
// callback due to circular dependency The server isn't "ready" until

View file

@ -25,24 +25,3 @@ export const getMockCallWithInternal = (hits = defaultMockSavedObjects) => {
export const getMockTaskFetch = (docs = defaultMockTaskDocs) => {
return () => Promise.resolve({ docs });
};
export const getMockKbnServer = (
mockCallWithInternal = getMockCallWithInternal(),
mockTaskFetch = getMockTaskFetch()
) => ({
plugins: {
elasticsearch: {
getCluster: () => ({
callWithInternalUser: mockCallWithInternal,
}),
},
xpack_main: {},
task_manager: {
registerTaskDefinitions: () => undefined,
schedule: () => Promise.resolve(),
fetch: mockTaskFetch,
},
},
config: () => ({ get: () => '' }),
log: () => undefined,
});

View file

@ -4,10 +4,12 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { Logger, PluginInitializerContext } from 'kibana/server';
import { Logger, PluginInitializerContext, CoreStart } from 'kibana/server';
import { Legacy } from 'kibana';
import { PLUGIN_ID } from './constants';
import { OssTelemetryPlugin } from './server/plugin';
import { LegacyPluginInitializer } from '../../../../src/legacy/plugin_discovery/types';
import { getTaskManagerSetup, getTaskManagerStart } from '../task_manager/server';
export const ossTelemetry: LegacyPluginInitializer = kibana => {
return new kibana.Plugin({
@ -15,7 +17,7 @@ export const ossTelemetry: LegacyPluginInitializer = kibana => {
require: ['elasticsearch', 'xpack_main'],
configPrefix: 'xpack.oss_telemetry',
init(server) {
init(server: Legacy.Server) {
const plugin = new OssTelemetryPlugin({
logger: {
get: () =>
@ -27,14 +29,24 @@ export const ossTelemetry: LegacyPluginInitializer = kibana => {
} as Logger),
},
} as PluginInitializerContext);
plugin.setup(server.newPlatform.setup.core, {
const deps = {
usageCollection: server.newPlatform.setup.plugins.usageCollection,
taskManager: server.plugins.task_manager,
__LEGACY: {
config: server.config(),
xpackMainStatus: ((server.plugins.xpack_main as unknown) as { status: any }).status
.plugin,
},
};
plugin.setup(server.newPlatform.setup.core, {
...deps,
taskManager: getTaskManagerSetup(server),
});
plugin.start((server.newPlatform.setup.core as unknown) as CoreStart, {
...deps,
taskManager: getTaskManagerStart(server),
});
},
});

View file

@ -5,8 +5,8 @@
*/
import { registerVisualizationsCollector } from './visualizations/register_usage_collector';
import { OssTelemetrySetupDependencies } from '../../plugin';
import { OssTelemetryStartDependencies } from '../../plugin';
export function registerCollectors(deps: OssTelemetrySetupDependencies) {
export function registerCollectors(deps: OssTelemetryStartDependencies) {
registerVisualizationsCollector(deps.usageCollection, deps.taskManager);
}

View file

@ -4,29 +4,37 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { getMockTaskFetch, getMockTaskManager } from '../../../../test_utils';
import {
getMockTaskFetch,
getMockThrowingTaskFetch,
getMockTaskInstance,
} from '../../../../test_utils';
import { taskManagerMock } from '../../../../../../../plugins/task_manager/server/task_manager.mock';
import { getUsageCollector } from './get_usage_collector';
describe('getVisualizationsCollector#fetch', () => {
test('can return empty stats', async () => {
const { type, fetch } = getUsageCollector(getMockTaskManager());
const { type, fetch } = getUsageCollector(taskManagerMock.start(getMockTaskFetch()));
expect(type).toBe('visualization_types');
const fetchResult = await fetch();
expect(fetchResult).toEqual({});
});
test('provides known stats', async () => {
const mockTaskFetch = getMockTaskFetch([
{
state: {
runs: 1,
stats: { comic_books: { total: 16, max: 12, min: 2, avg: 6 } },
},
taskType: 'test',
params: {},
},
]);
const { type, fetch } = getUsageCollector(getMockTaskManager(mockTaskFetch));
const { type, fetch } = getUsageCollector(
taskManagerMock.start(
getMockTaskFetch([
getMockTaskInstance({
state: {
runs: 1,
stats: { comic_books: { total: 16, max: 12, min: 2, avg: 6 } },
},
taskType: 'test',
params: {},
}),
])
)
);
expect(type).toBe('visualization_types');
const fetchResult = await fetch();
expect(fetchResult).toEqual({ comic_books: { avg: 6, max: 12, min: 2, total: 16 } });
@ -34,20 +42,21 @@ describe('getVisualizationsCollector#fetch', () => {
describe('Error handling', () => {
test('Silently handles Task Manager NotInitialized', async () => {
const mockTaskFetch = jest.fn(() => {
throw new Error('NotInitialized taskManager is still waiting for plugins to load');
});
const { fetch } = getUsageCollector(getMockTaskManager(mockTaskFetch));
const { fetch } = getUsageCollector(
taskManagerMock.start(
getMockThrowingTaskFetch(
new Error('NotInitialized taskManager is still waiting for plugins to load')
)
)
);
const result = await fetch();
expect(result).toBe(undefined);
});
// In real life, the CollectorSet calls fetch and handles errors
test('defers the errors', async () => {
const mockTaskFetch = jest.fn(() => {
throw new Error('BOOM');
});
const { fetch } = getUsageCollector(getMockTaskManager(mockTaskFetch));
const { fetch } = getUsageCollector(
taskManagerMock.start(getMockThrowingTaskFetch(new Error('BOOM')))
);
await expect(fetch()).rejects.toThrowErrorMatchingInlineSnapshot(`"BOOM"`);
});
});

View file

@ -5,15 +5,15 @@
*/
import { get } from 'lodash';
import { PluginSetupContract as TaskManagerPluginSetupContract } from '../../../../../task_manager/server/plugin';
import { PLUGIN_ID, VIS_TELEMETRY_TASK, VIS_USAGE_TYPE } from '../../../../constants';
import { TaskManagerStartContract } from '../../../../../../../plugins/task_manager/server';
async function isTaskManagerReady(taskManager: TaskManagerPluginSetupContract | undefined) {
async function isTaskManagerReady(taskManager?: TaskManagerStartContract) {
const result = await fetch(taskManager);
return result !== null;
}
async function fetch(taskManager: TaskManagerPluginSetupContract | undefined) {
async function fetch(taskManager?: TaskManagerStartContract) {
if (!taskManager) {
return null;
}
@ -38,7 +38,7 @@ async function fetch(taskManager: TaskManagerPluginSetupContract | undefined) {
return docs;
}
export function getUsageCollector(taskManager: TaskManagerPluginSetupContract | undefined) {
export function getUsageCollector(taskManager?: TaskManagerStartContract) {
let isCollectorReady = false;
async function determineIfTaskManagerIsReady() {
let isReady = false;

View file

@ -5,12 +5,12 @@
*/
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import { PluginSetupContract as TaskManagerPluginSetupContract } from '../../../../../task_manager/server/plugin';
import { TaskManagerStartContract } from '../../../../../../../plugins/task_manager/server';
import { getUsageCollector } from './get_usage_collector';
export function registerVisualizationsCollector(
collectorSet: UsageCollectionSetup,
taskManager: TaskManagerPluginSetupContract | undefined
taskManager?: TaskManagerStartContract
): void {
const collector = collectorSet.makeUsageCollector(getUsageCollector(taskManager));
collectorSet.registerCollector(collector);

View file

@ -5,12 +5,15 @@
*/
import { CoreSetup, Logger } from 'kibana/server';
import { PluginSetupContract as TaskManagerPluginSetupContract } from '../../../../task_manager/server/plugin';
import { PLUGIN_ID, VIS_TELEMETRY_TASK } from '../../../constants';
import { visualizationsTaskRunner } from './visualizations/task_runner';
import KbnServer from '../../../../../../../src/legacy/server/kbn_server';
import { LegacyConfig } from '../../plugin';
import { TaskInstance } from '../../../../task_manager/server';
import {
TaskInstance,
TaskManagerStartContract,
TaskManagerSetupContract,
} from '../../../../../../plugins/task_manager/server';
export function registerTasks({
taskManager,
@ -18,7 +21,7 @@ export function registerTasks({
elasticsearch,
config,
}: {
taskManager?: TaskManagerPluginSetupContract;
taskManager?: TaskManagerSetupContract;
logger: Logger;
elasticsearch: CoreSetup['elasticsearch'];
config: LegacyConfig;
@ -46,7 +49,7 @@ export function scheduleTasks({
xpackMainStatus,
logger,
}: {
taskManager?: TaskManagerPluginSetupContract;
taskManager?: TaskManagerStartContract;
xpackMainStatus: { kbnServer: KbnServer };
logger: Logger;
}) {

View file

@ -12,7 +12,7 @@ import {
getMockTaskInstance,
} from '../../../../test_utils';
import { visualizationsTaskRunner } from './task_runner';
import { TaskInstance } from '../../../../../task_manager/server';
import { TaskInstance } from '../../../../../../../plugins/task_manager/server';
describe('visualizationsTaskRunner', () => {
let mockTaskInstance: TaskInstance;

View file

@ -8,7 +8,7 @@ import _, { countBy, groupBy, mapValues } from 'lodash';
import { APICaller, CoreSetup } from 'kibana/server';
import { getNextMidnight } from '../../get_next_midnight';
import { VisState } from '../../../../../../../../src/legacy/core_plugins/visualizations/public';
import { TaskInstance } from '../../../../../task_manager/server';
import { TaskInstance } from '../../../../../../../plugins/task_manager/server';
import { ESSearchHit } from '../../../../../apm/typings/elasticsearch';
import { LegacyConfig } from '../../../plugin';

View file

@ -4,8 +4,11 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { CoreSetup, Logger, Plugin, PluginInitializerContext } from 'kibana/server';
import { PluginSetupContract as TaskManagerPluginSetupContract } from '../../task_manager/server/plugin';
import { CoreSetup, CoreStart, Logger, Plugin, PluginInitializerContext } from 'kibana/server';
import {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '../../../../plugins/task_manager/server';
import { registerCollectors } from './lib/collectors';
import { registerTasks, scheduleTasks } from './lib/tasks';
import KbnServer from '../../../../../src/legacy/server/kbn_server';
@ -15,13 +18,18 @@ export interface LegacyConfig {
get: (key: string) => string | number | boolean;
}
export interface OssTelemetrySetupDependencies {
interface OssTelemetryDependencies {
usageCollection: UsageCollectionSetup;
__LEGACY: {
config: LegacyConfig;
xpackMainStatus: { kbnServer: KbnServer };
};
taskManager?: TaskManagerPluginSetupContract;
}
export interface OssTelemetrySetupDependencies extends OssTelemetryDependencies {
taskManager?: TaskManagerSetupContract;
}
export interface OssTelemetryStartDependencies extends OssTelemetryDependencies {
taskManager?: TaskManagerStartContract;
}
export class OssTelemetryPlugin implements Plugin {
@ -32,19 +40,20 @@ export class OssTelemetryPlugin implements Plugin {
}
public setup(core: CoreSetup, deps: OssTelemetrySetupDependencies) {
registerCollectors(deps);
registerTasks({
taskManager: deps.taskManager,
logger: this.logger,
elasticsearch: core.elasticsearch,
config: deps.__LEGACY.config,
});
}
public start(core: CoreStart, deps: OssTelemetryStartDependencies) {
registerCollectors(deps);
scheduleTasks({
taskManager: deps.taskManager,
xpackMainStatus: deps.__LEGACY.xpackMainStatus,
logger: this.logger,
});
}
public start() {}
}

View file

@ -6,13 +6,28 @@
import { APICaller, CoreSetup } from 'kibana/server';
import { TaskInstance } from '../../task_manager/server';
import { PluginSetupContract as TaskManagerPluginSetupContract } from '../../task_manager/server/plugin';
import {
ConcreteTaskInstance,
TaskStatus,
TaskManagerStartContract,
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
} from '../../../../plugins/task_manager/server';
export const getMockTaskInstance = (): TaskInstance => ({
export const getMockTaskInstance = (
overrides: Partial<ConcreteTaskInstance> = {}
): ConcreteTaskInstance => ({
state: { runs: 0, stats: {} },
taskType: 'test',
params: {},
id: '',
scheduledAt: new Date(),
attempts: 1,
status: TaskStatus.Idle,
runAt: new Date(),
startedAt: null,
retryAt: null,
ownerId: null,
...overrides,
});
const defaultMockSavedObjects = [
@ -38,8 +53,24 @@ export const getMockCallWithInternal = (hits: unknown[] = defaultMockSavedObject
}) as unknown) as APICaller;
};
export const getMockTaskFetch = (docs: TaskInstance[] = defaultMockTaskDocs) => {
return () => Promise.resolve({ docs });
export const getMockTaskFetch = (
docs: ConcreteTaskInstance[] = defaultMockTaskDocs
): Partial<jest.Mocked<TaskManagerStartContract>> => {
return {
fetch: jest.fn(fetchOpts => {
return Promise.resolve({ docs, searchAfter: [] });
}),
} as Partial<jest.Mocked<TaskManagerStartContract>>;
};
export const getMockThrowingTaskFetch = (
throws: Error
): Partial<jest.Mocked<TaskManagerStartContract>> => {
return {
fetch: jest.fn(fetchOpts => {
throw throws;
}),
} as Partial<jest.Mocked<TaskManagerStartContract>>;
};
export const getMockConfig = () => {
@ -48,13 +79,6 @@ export const getMockConfig = () => {
};
};
export const getMockTaskManager = (fetch: any = getMockTaskFetch()) =>
(({
registerTaskDefinitions: () => undefined,
ensureScheduled: () => Promise.resolve(),
fetch,
} as unknown) as TaskManagerPluginSetupContract);
export const getCluster = () => ({
callWithInternalUser: getMockCallWithInternal(),
});

View file

@ -6,19 +6,26 @@
import { Root } from 'joi';
import { Legacy } from 'kibana';
import { Plugin, PluginSetupContract } from './plugin';
import { SavedObjectsSerializer, SavedObjectsSchema } from '../../../../../src/core/server';
import mappings from './mappings.json';
import { migrations } from './migrations';
export { PluginSetupContract as TaskManager };
export {
TaskInstance,
ConcreteTaskInstance,
TaskRunCreatorFunction,
TaskStatus,
RunContext,
} from './task';
import { createLegacyApi, getTaskManagerSetup } from './legacy';
export { LegacyTaskManagerApi, getTaskManagerSetup, getTaskManagerStart } from './legacy';
// Once all plugins are migrated to NP, this can be removed
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { TaskManager } from '../../../../plugins/task_manager/server/task_manager';
const savedObjectSchemas = {
task: {
hidden: true,
isNamespaceAgnostic: true,
convertToAliasScript: `ctx._id = ctx._source.type + ':' + ctx._id`,
indexPattern(config: any) {
return config.get('xpack.task_manager.index');
},
},
};
export function taskManager(kibana: any) {
return new kibana.Plugin({
@ -28,73 +35,41 @@ export function taskManager(kibana: any) {
config(Joi: Root) {
return Joi.object({
enabled: Joi.boolean().default(true),
max_attempts: Joi.number()
.description(
'The maximum number of times a task will be attempted before being abandoned as failed'
)
.min(1)
.default(3),
poll_interval: Joi.number()
.description('How often, in milliseconds, the task manager will look for more work.')
.min(100)
.default(3000),
request_capacity: Joi.number()
.description('How many requests can Task Manager buffer before it rejects new requests.')
.min(1)
// a nice round contrived number, feel free to change as we learn how it behaves
.default(1000),
index: Joi.string()
.description('The name of the index used to store task information.')
.default('.kibana_task_manager')
.invalid(['.tasks']),
max_workers: Joi.number()
.description(
'The maximum number of tasks that this Kibana instance will run simultaneously.'
)
.min(1) // disable the task manager rather than trying to specify it with 0 workers
.default(10),
}).default();
},
init(server: Legacy.Server) {
const plugin = new Plugin({
logger: {
get: () => ({
info: (message: string) => server.log(['info', 'task_manager'], message),
debug: (message: string) => server.log(['debug', 'task_manager'], message),
warn: (message: string) => server.log(['warn', 'task_manager'], message),
error: (message: string) => server.log(['error', 'task_manager'], message),
}),
},
});
const schema = new SavedObjectsSchema(this.kbnServer.uiExports.savedObjectSchemas);
const serializer = new SavedObjectsSerializer(schema);
const setupContract = plugin.setup(
{},
{
serializer,
config: server.config(),
elasticsearch: server.plugins.elasticsearch,
savedObjects: server.savedObjects,
}
/*
* We must expose the New Platform Task Manager Plugin via the legacy Api
* as removing it now would be a breaking change - we'll remove this in v8.0.0
*/
server.expose(
createLegacyApi(
getTaskManagerSetup(server)!
.registerLegacyAPI({
savedObjectSchemas,
})
.then((taskManagerPlugin: TaskManager) => {
// we can't tell the Kibana Platform Task Manager plugin to
// to wait to `start` as that happens before legacy plugins
// instead we will start the internal Task Manager plugin when
// all legacy plugins have finished initializing
// Once all plugins are migrated to NP, this can be removed
this.kbnServer.afterPluginsInit(() => {
taskManagerPlugin.start();
});
return taskManagerPlugin;
})
)
);
this.kbnServer.afterPluginsInit(() => {
plugin.start();
});
server.expose(setupContract);
},
uiExports: {
mappings,
migrations,
savedObjectSchemas: {
task: {
hidden: true,
isNamespaceAgnostic: true,
convertToAliasScript: `ctx._id = ctx._source.type + ':' + ctx._id`,
indexPattern(config: any) {
return config.get('xpack.task_manager.index');
},
},
},
savedObjectSchemas,
},
});
}

View file

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Server } from 'src/legacy/server/kbn_server';
import {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '../../../../plugins/task_manager/server';
import { Middleware } from '../../../../plugins/task_manager/server/lib/middleware.js';
import {
TaskDictionary,
TaskInstanceWithDeprecatedFields,
TaskInstanceWithId,
TaskDefinition,
} from '../../../../plugins/task_manager/server/task.js';
import { FetchOpts } from '../../../../plugins/task_manager/server/task_store.js';
// Once all plugins are migrated to NP and we can remove Legacy TaskManager in version 8.0.0,
// this can be removed
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { TaskManager } from '../../../../plugins/task_manager/server/task_manager';
export type LegacyTaskManagerApi = Pick<
TaskManagerSetupContract,
'addMiddleware' | 'registerTaskDefinitions'
> &
TaskManagerStartContract;
export function getTaskManagerSetup(server: Server): TaskManagerSetupContract | undefined {
return server?.newPlatform?.setup?.plugins?.taskManager as TaskManagerSetupContract;
}
export function getTaskManagerStart(server: Server): TaskManagerStartContract | undefined {
return server?.newPlatform?.start?.plugins?.taskManager as TaskManagerStartContract;
}
export function createLegacyApi(legacyTaskManager: Promise<TaskManager>): LegacyTaskManagerApi {
return {
addMiddleware: (middleware: Middleware) => {
legacyTaskManager.then((tm: TaskManager) => tm.addMiddleware(middleware));
},
registerTaskDefinitions: (taskDefinitions: TaskDictionary<TaskDefinition>) => {
legacyTaskManager.then((tm: TaskManager) => tm.registerTaskDefinitions(taskDefinitions));
},
fetch: (opts: FetchOpts) => legacyTaskManager.then((tm: TaskManager) => tm.fetch(opts)),
remove: (id: string) => legacyTaskManager.then((tm: TaskManager) => tm.remove(id)),
schedule: (taskInstance: TaskInstanceWithDeprecatedFields, options?: any) =>
legacyTaskManager.then((tm: TaskManager) => tm.schedule(taskInstance, options)),
runNow: (taskId: string) => legacyTaskManager.then((tm: TaskManager) => tm.runNow(taskId)),
ensureScheduled: (taskInstance: TaskInstanceWithId, options?: any) =>
legacyTaskManager.then((tm: TaskManager) => tm.ensureScheduled(taskInstance, options)),
};
}

View file

@ -1,73 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Plugin, LegacyDeps } from './plugin';
import { mockLogger } from './test_utils';
import { TaskManager } from './task_manager';
jest.mock('./task_manager');
describe('Task Manager Plugin', () => {
let plugin: Plugin;
const mockCoreSetup = {};
const mockLegacyDeps: LegacyDeps = {
config: {
get: jest.fn(),
},
serializer: {},
elasticsearch: {
getCluster: jest.fn(),
},
savedObjects: {
getSavedObjectsRepository: jest.fn(),
},
};
beforeEach(() => {
jest.resetAllMocks();
mockLegacyDeps.elasticsearch.getCluster.mockReturnValue({ callWithInternalUser: jest.fn() });
plugin = new Plugin({
logger: {
get: mockLogger,
},
});
});
describe('setup()', () => {
test('exposes proper contract', async () => {
const setupResult = plugin.setup(mockCoreSetup, mockLegacyDeps);
expect(setupResult).toMatchInlineSnapshot(`
Object {
"addMiddleware": [Function],
"ensureScheduled": [Function],
"fetch": [Function],
"registerTaskDefinitions": [Function],
"remove": [Function],
"runNow": [Function],
"schedule": [Function],
}
`);
});
});
describe('start()', () => {
test('properly starts up the task manager', async () => {
plugin.setup(mockCoreSetup, mockLegacyDeps);
plugin.start();
const taskManager = (TaskManager as any).mock.instances[0];
expect(taskManager.start).toHaveBeenCalled();
});
});
describe('stop()', () => {
test('properly stops up the task manager', async () => {
plugin.setup(mockCoreSetup, mockLegacyDeps);
plugin.stop();
const taskManager = (TaskManager as any).mock.instances[0];
expect(taskManager.stop).toHaveBeenCalled();
});
});
});

View file

@ -1,82 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Logger } from './types';
import { TaskManager } from './task_manager';
export interface PluginSetupContract {
fetch: TaskManager['fetch'];
remove: TaskManager['remove'];
schedule: TaskManager['schedule'];
runNow: TaskManager['runNow'];
ensureScheduled: TaskManager['ensureScheduled'];
addMiddleware: TaskManager['addMiddleware'];
registerTaskDefinitions: TaskManager['registerTaskDefinitions'];
}
export interface LegacyDeps {
config: any;
serializer: any;
elasticsearch: any;
savedObjects: any;
}
interface PluginInitializerContext {
logger: {
get: () => Logger;
};
}
export class Plugin {
private logger: Logger;
private taskManager?: TaskManager;
constructor(initializerContext: PluginInitializerContext) {
this.logger = initializerContext.logger.get();
}
// TODO: Make asynchronous like new platform
public setup(
core: {},
{ config, serializer, elasticsearch, savedObjects }: LegacyDeps
): PluginSetupContract {
const { callWithInternalUser } = elasticsearch.getCluster('admin');
const savedObjectsRepository = savedObjects.getSavedObjectsRepository(callWithInternalUser, [
'task',
]);
const taskManager = new TaskManager({
config,
savedObjectsRepository,
serializer,
callWithInternalUser,
logger: this.logger,
});
this.taskManager = taskManager;
return {
fetch: (...args) => taskManager.fetch(...args),
remove: (...args) => taskManager.remove(...args),
schedule: (...args) => taskManager.schedule(...args),
runNow: (...args) => taskManager.runNow(...args),
ensureScheduled: (...args) => taskManager.ensureScheduled(...args),
addMiddleware: (...args) => taskManager.addMiddleware(...args),
registerTaskDefinitions: (...args) => taskManager.registerTaskDefinitions(...args),
};
}
public start() {
if (this.taskManager) {
this.taskManager.start();
}
}
public stop() {
if (this.taskManager) {
this.taskManager.stop();
}
}
}

View file

@ -4,23 +4,32 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { TaskManager } from './types';
const createTaskManagerMock = () => {
const mocked: jest.Mocked<TaskManager> = {
registerTaskDefinitions: jest.fn(),
addMiddleware: jest.fn(),
ensureScheduled: jest.fn(),
schedule: jest.fn(),
fetch: jest.fn(),
runNow: jest.fn(),
remove: jest.fn(),
start: jest.fn(),
stop: jest.fn(),
};
return mocked;
};
import {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '../../../../plugins/task_manager/server';
import { Subject } from 'rxjs';
export const taskManagerMock = {
create: createTaskManagerMock,
setup(overrides: Partial<jest.Mocked<TaskManagerSetupContract>> = {}) {
const mocked: jest.Mocked<TaskManagerSetupContract> = {
registerTaskDefinitions: jest.fn(),
addMiddleware: jest.fn(),
config$: new Subject(),
registerLegacyAPI: jest.fn(),
...overrides,
};
return mocked;
},
start(overrides: Partial<jest.Mocked<TaskManagerStartContract>> = {}) {
const mocked: jest.Mocked<TaskManagerStartContract> = {
ensureScheduled: jest.fn(),
schedule: jest.fn(),
fetch: jest.fn(),
runNow: jest.fn(),
remove: jest.fn(),
...overrides,
};
return mocked;
},
};

View file

@ -0,0 +1,8 @@
{
"id": "taskManager",
"server": true,
"version": "8.0.0",
"kibanaVersion": "kibana",
"configPath": ["xpack", "task_manager"],
"ui": false
}

View file

@ -55,51 +55,61 @@ Plugins define tasks by calling the `registerTaskDefinitions` method on the `ser
A sample task can be found in the [x-pack/test/plugin_api_integration/plugins/task_manager](../../test/plugin_api_integration/plugins/task_manager/index.js) folder.
```js
const taskManager = server.plugins.task_manager;
taskManager.registerTaskDefinitions({
// clusterMonitoring is the task type, and must be unique across the entire system
clusterMonitoring: {
// Human friendly name, used to represent this task in logs, UI, etc
title: 'Human friendly name',
export class Plugin {
constructor() {
}
// Optional, human-friendly, more detailed description
description: 'Amazing!!',
public setup(core: CoreSetup, plugins: { taskManager }) {
taskManager.registerTaskDefinitions({
// clusterMonitoring is the task type, and must be unique across the entire system
clusterMonitoring: {
// Human friendly name, used to represent this task in logs, UI, etc
title: 'Human friendly name',
// Optional, how long, in minutes or seconds, the system should wait before
// a running instance of this task is considered to be timed out.
// This defaults to 5 minutes.
timeout: '5m',
// Optional, human-friendly, more detailed description
description: 'Amazing!!',
// Optional, how many attempts before marking task as failed.
// This defaults to what is configured at the task manager level.
maxAttempts: 5,
// Optional, how long, in minutes or seconds, the system should wait before
// a running instance of this task is considered to be timed out.
// This defaults to 5 minutes.
timeout: '5m',
// The clusterMonitoring task occupies 2 workers, so if the system has 10 worker slots,
// 5 clusterMonitoring tasks could run concurrently per Kibana instance. This value is
// overridden by the `override_num_workers` config value, if specified.
numWorkers: 2,
// Optional, how many attempts before marking task as failed.
// This defaults to what is configured at the task manager level.
maxAttempts: 5,
// The createTaskRunner function / method returns an object that is responsible for
// performing the work of the task. context: { taskInstance }, is documented below.
createTaskRunner(context) {
return {
// Perform the work of the task. The return value should fit the TaskResult interface, documented
// below. Invalid return values will result in a logged warning.
async run() {
// Do some work
// Conditionally send some alerts
// Return some result or other...
// The clusterMonitoring task occupies 2 workers, so if the system has 10 worker slots,
// 5 clusterMonitoring tasks could run concurrently per Kibana instance. This value is
// overridden by the `override_num_workers` config value, if specified.
numWorkers: 2,
// The createTaskRunner function / method returns an object that is responsible for
// performing the work of the task. context: { taskInstance }, is documented below.
createTaskRunner(context) {
return {
// Perform the work of the task. The return value should fit the TaskResult interface, documented
// below. Invalid return values will result in a logged warning.
async run() {
// Do some work
// Conditionally send some alerts
// Return some result or other...
},
// Optional, will be called if a running instance of this task times out, allowing the task
// to attempt to clean itself up.
async cancel() {
// Do whatever is required to cancel this task, such as killing any spawned processes
},
};
},
},
});
}
// Optional, will be called if a running instance of this task times out, allowing the task
// to attempt to clean itself up.
async cancel() {
// Do whatever is required to cancel this task, such as killing any spawned processes
},
};
},
},
});
public start(core: CoreStart, plugins: { taskManager }) {
}
}
```
When Kibana attempts to claim and run a task instance, it looks its definition up, and executes its createTaskRunner's method, passing it a run context which looks like this:
@ -222,67 +232,129 @@ The data stored for a task instance looks something like this:
The task manager mixin exposes a taskManager object on the Kibana server which plugins can use to manage scheduled tasks. Each method takes an optional `scope` argument and ensures that only tasks with the specified scope(s) will be affected.
### schedule
Using `schedule` you can instruct TaskManger to schedule an instance of a TaskType at some point in the future.
### Overview
Interaction with the TaskManager Plugin is done via the Kibana Platform Plugin system.
When developing your Plugin, you're asked to define a `setup` method and a `start` method.
These methods are handed Kibana's Plugin APIs for these two stages, which means you'll have access to the following apis in these two stages:
#### Setup
The _Setup_ Plugin api includes methods which configure Task Manager to support your Plugin's requirements, such as defining custom Middleware and Task Definitions.
```js
const taskManager = server.plugins.task_manager;
// Schedules a task. All properties are as documented in the previous
// storage section, except that here, params is an object, not a JSON
// string.
const task = await taskManager.schedule({
taskType,
runAt,
schedule,
params,
scope: ['my-fanci-app'],
});
// Removes the specified task
await manager.remove(task.id);
// Fetches tasks, supports pagination, via the search-after API:
// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-search-after.html
// If scope is not specified, all tasks are returned, otherwise only tasks
// with the given scope are returned.
const results = await manager.find({ scope: 'my-fanci-app', searchAfter: ['ids'] });
// results look something like this:
{
searchAfter: ['233322'],
// Tasks is an array of task instances
tasks: [{
id: '3242342',
taskType: 'reporting',
// etc
}]
addMiddleware: (middleware: Middleware) => {
// ...
},
registerTaskDefinitions: (taskDefinitions: TaskDictionary<TaskDefinition>) => {
// ...
},
}
```
### ensureScheduling
#### Start
The _Start_ Plugin api allow you to use Task Manager to facilitate your Plugin's behaviour, such as scheduling tasks.
```js
{
fetch: (opts: FetchOpts) => {
// ...
},
remove: (id: string) => {
// ...
},
schedule: (taskInstance: TaskInstanceWithDeprecatedFields, options?: any) => {
// ...
},
runNow: (taskId: string) => {
// ...
},
ensureScheduled: (taskInstance: TaskInstanceWithId, options?: any) => {
// ...
},
}
```
### Detailed APIs
#### schedule
Using `schedule` you can instruct TaskManger to schedule an instance of a TaskType at some point in the future.
```js
export class Plugin {
constructor() {
}
public setup(core: CoreSetup, plugins: { taskManager }) {
}
public start(core: CoreStart, plugins: { taskManager }) {
// Schedules a task. All properties are as documented in the previous
// storage section, except that here, params is an object, not a JSON
// string.
const task = await taskManager.schedule({
taskType,
runAt,
schedule,
params,
scope: ['my-fanci-app'],
});
// Removes the specified task
await taskManager.remove(task.id);
// Fetches tasks, supports pagination, via the search-after API:
// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-search-after.html
// If scope is not specified, all tasks are returned, otherwise only tasks
// with the given scope are returned.
const results = await taskManager.find({ scope: 'my-fanci-app', searchAfter: ['ids'] });
}
}
```
*results* then look something like this:
```json
{
"searchAfter": ["233322"],
// Tasks is an array of task instances
"tasks": [{
"id": "3242342",
"taskType": "reporting",
// etc
}]
}
```
#### ensureScheduling
When using the `schedule` api to schedule a Task you can provide a hard coded `id` on the Task. This tells TaskManager to use this `id` to identify the Task Instance rather than generate an `id` on its own.
The danger is that in such a situation, a Task with that same `id` might already have been scheduled at some earlier point, and this would result in an error. In some cases, this is the expected behavior, but often you only care about ensuring the task has been _scheduled_ and don't need it to be scheduled a fresh.
To achieve this you should use the `ensureScheduling` api which has the exact same behavior as `schedule`, except it allows the scheduling of a Task with an `id` that's already in assigned to another Task and it will assume that the existing Task is the one you wished to `schedule`, treating this as a successful operation.
### runNow
#### runNow
Using `runNow` you can instruct TaskManger to run an existing task on-demand, without waiting for its scheduled time to be reached.
```js
const taskManager = server.plugins.task_manager;
export class Plugin {
constructor() {
}
try {
const taskRunResult = await taskManager.runNow('91760f10-ba42-de9799');
// If no error is thrown, the task has completed successfully.
} catch(err: Error) {
// If running the task has failed, we throw an error with an appropriate message.
// For example, if the requested task doesnt exist: `Error: failed to run task "91760f10-ba42-de9799" as it does not exist`
// Or if, for example, the task is already running: `Error: failed to run task "91760f10-ba42-de9799" as it is currently running`
public setup(core: CoreSetup, plugins: { taskManager }) {
}
public start(core: CoreStart, plugins: { taskManager }) {
try {
const taskRunResult = await taskManager.runNow('91760f10-ba42-de9799');
// If no error is thrown, the task has completed successfully.
} catch(err: Error) {
// If running the task has failed, we throw an error with an appropriate message.
// For example, if the requested task doesnt exist: `Error: failed to run task "91760f10-ba42-de9799" as it does not exist`
// Or if, for example, the task is already running: `Error: failed to run task "91760f10-ba42-de9799" as it is currently running`
}
}
}
```
### more options
#### more options
More custom access to the tasks can be done directly via Elasticsearch, though that won't be officially supported, as we can change the document structure at any time.
@ -291,35 +363,44 @@ More custom access to the tasks can be done directly via Elasticsearch, though t
The task manager exposes a middleware layer that allows modifying tasks before they are scheduled / persisted to the task manager index, and modifying tasks / the run context before a task is run.
For example:
```js
// In your plugin's init
server.plugins.task_manager.addMiddleware({
async beforeSave({ taskInstance, ...opts }) {
console.log(`About to save a task of type ${taskInstance.taskType}`);
export class Plugin {
constructor() {
}
return {
...opts,
taskInstance: {
...taskInstance,
params: {
...taskInstance.params,
example: 'Added to params!',
},
public setup(core: CoreSetup, plugins: { taskManager }) {
taskManager.addMiddleware({
async beforeSave({ taskInstance, ...opts }) {
console.log(`About to save a task of type ${taskInstance.taskType}`);
return {
...opts,
taskInstance: {
...taskInstance,
params: {
...taskInstance.params,
example: 'Added to params!',
},
},
};
},
};
},
async beforeRun({ taskInstance, ...opts }) {
console.log(`About to run ${taskInstance.taskType} ${taskInstance.id}`);
const { example, ...taskWithoutExampleProp } = taskInstance;
async beforeRun({ taskInstance, ...opts }) {
console.log(`About to run ${taskInstance.taskType} ${taskInstance.id}`);
const { example, ...taskWithoutExampleProp } = taskInstance;
return {
...opts,
taskInstance: taskWithoutExampleProp,
};
},
});
return {
...opts,
taskInstance: taskWithoutExampleProp,
};
},
});
}
public start(core: CoreStart, plugins: { taskManager }) {
}
}
```
## Task Poller: polling for work

View file

@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { configSchema } from './config';
describe('config validation', () => {
test('task manager defaults', () => {
const config: Record<string, any> = {};
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"enabled": true,
"index": ".kibana_task_manager",
"max_attempts": 3,
"max_workers": 10,
"poll_interval": 3000,
"request_capacity": 1000,
}
`);
});
test('the ElastiSearch Tasks index cannot be used for task manager', () => {
const config: Record<string, any> = {
index: '.tasks',
};
expect(() => {
configSchema.validate(config);
}).toThrowErrorMatchingInlineSnapshot(
`"[index]: \\".tasks\\" is an invalid Kibana Task Manager index, as it is already in use by the ElasticSearch Tasks Manager"`
);
});
});

View file

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { schema, TypeOf } from '@kbn/config-schema';
export const configSchema = schema.object({
enabled: schema.boolean({ defaultValue: true }),
/* The maximum number of times a task will be attempted before being abandoned as failed */
max_attempts: schema.number({
defaultValue: 3,
min: 1,
}),
/* How often, in milliseconds, the task manager will look for more work. */
poll_interval: schema.number({
defaultValue: 3000,
min: 100,
}),
/* How many requests can Task Manager buffer before it rejects new requests. */
request_capacity: schema.number({
// a nice round contrived number, feel free to change as we learn how it behaves
defaultValue: 1000,
min: 1,
}),
/* The name of the index used to store task information. */
index: schema.string({
defaultValue: '.kibana_task_manager',
validate: val => {
if (val.toLowerCase() === '.tasks') {
return `"${val}" is an invalid Kibana Task Manager index, as it is already in use by the ElasticSearch Tasks Manager`;
}
},
}),
/* The maximum number of tasks that this Kibana instance will run simultaneously. */
max_workers: schema.number({
defaultValue: 10,
// disable the task manager rather than trying to specify it with 0 workers
min: 1,
}),
});
export type TaskManagerConfig = TypeOf<typeof configSchema>;

View file

@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { createTaskManager, LegacyDeps } from './create_task_manager';
import { mockLogger } from './test_utils';
import { CoreSetup, UuidServiceSetup } from 'kibana/server';
import { savedObjectsRepositoryMock } from '../../../../src/core/server/mocks';
jest.mock('./task_manager');
describe('createTaskManager', () => {
const uuid: UuidServiceSetup = {
getInstanceUuid() {
return 'some-uuid';
},
};
const mockCoreSetup = {
uuid,
} as CoreSetup;
const getMockLegacyDeps = (): LegacyDeps => ({
config: {},
savedObjectSchemas: {},
elasticsearch: {
callAsInternalUser: jest.fn(),
},
savedObjectsRepository: savedObjectsRepositoryMock.create(),
logger: mockLogger(),
});
beforeEach(() => {
jest.resetAllMocks();
});
test('exposes the underlying TaskManager', async () => {
const mockLegacyDeps = getMockLegacyDeps();
const setupResult = createTaskManager(mockCoreSetup, mockLegacyDeps);
expect(setupResult).toMatchInlineSnapshot(`
TaskManager {
"addMiddleware": [MockFunction],
"assertUninitialized": [MockFunction],
"attemptToRun": [MockFunction],
"ensureScheduled": [MockFunction],
"fetch": [MockFunction],
"registerTaskDefinitions": [MockFunction],
"remove": [MockFunction],
"runNow": [MockFunction],
"schedule": [MockFunction],
"start": [MockFunction],
"stop": [MockFunction],
"waitUntilStarted": [MockFunction],
}
`);
});
});

View file

@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import {
IClusterClient,
SavedObjectsSerializer,
SavedObjectsSchema,
CoreSetup,
ISavedObjectsRepository,
} from '../../../../src/core/server';
import { TaskManager } from './task_manager';
import { Logger } from './types';
export interface LegacyDeps {
config: any;
savedObjectSchemas: any;
elasticsearch: Pick<IClusterClient, 'callAsInternalUser'>;
savedObjectsRepository: ISavedObjectsRepository;
logger: Logger;
}
export function createTaskManager(
core: CoreSetup,
{
logger,
config,
savedObjectSchemas,
elasticsearch: { callAsInternalUser },
savedObjectsRepository,
}: LegacyDeps
) {
// as we use this Schema solely to interact with Tasks, we
// can initialise it with solely the Tasks schema
const serializer = new SavedObjectsSerializer(new SavedObjectsSchema(savedObjectSchemas));
return new TaskManager({
taskManagerId: core.uuid.getInstanceUuid(),
config,
savedObjectsRepository,
serializer,
callAsInternalUser,
logger,
});
}

View file

@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { PluginInitializerContext } from 'src/core/server';
import { TaskManagerPlugin } from './plugin';
import { configSchema } from './config';
export const plugin = (initContext: PluginInitializerContext) => new TaskManagerPlugin(initContext);
export {
TaskInstance,
ConcreteTaskInstance,
TaskRunCreatorFunction,
TaskStatus,
RunContext,
} from './task';
export {
TaskManagerPlugin as TaskManager,
TaskManagerSetupContract,
TaskManagerStartContract,
} from './plugin';
export const config = {
schema: configSchema,
};

View file

@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { PluginInitializerContext, Plugin, CoreSetup } from 'src/core/server';
import { Observable, Subject } from 'rxjs';
import { first } from 'rxjs/operators';
import { once } from 'lodash';
import { TaskDictionary, TaskDefinition } from './task';
import { TaskManager } from './task_manager';
import { createTaskManager, LegacyDeps } from './create_task_manager';
import { TaskManagerConfig } from './config';
import { Middleware } from './lib/middleware';
export type PluginLegacyDependencies = Pick<LegacyDeps, 'savedObjectSchemas'>;
export type TaskManagerSetupContract = {
config$: Observable<TaskManagerConfig>;
registerLegacyAPI: (legacyDependencies: PluginLegacyDependencies) => Promise<TaskManager>;
} & Pick<TaskManager, 'addMiddleware' | 'registerTaskDefinitions'>;
export type TaskManagerStartContract = Pick<
TaskManager,
'fetch' | 'remove' | 'schedule' | 'runNow' | 'ensureScheduled'
>;
export class TaskManagerPlugin
implements Plugin<TaskManagerSetupContract, TaskManagerStartContract> {
legacyTaskManager$: Subject<TaskManager> = new Subject<TaskManager>();
taskManager: Promise<TaskManager> = this.legacyTaskManager$.pipe(first()).toPromise();
currentConfig: TaskManagerConfig;
constructor(private readonly initContext: PluginInitializerContext) {
this.initContext = initContext;
this.currentConfig = {} as TaskManagerConfig;
}
public setup(core: CoreSetup, plugins: any): TaskManagerSetupContract {
const logger = this.initContext.logger.get('taskManager');
const config$ = this.initContext.config.create<TaskManagerConfig>();
const savedObjectsRepository = core.savedObjects.createInternalRepository(['task']);
const elasticsearch = core.elasticsearch.adminClient;
return {
config$,
registerLegacyAPI: once((__LEGACY: PluginLegacyDependencies) => {
config$.subscribe(async config => {
this.legacyTaskManager$.next(
createTaskManager(core, {
logger,
config,
elasticsearch,
savedObjectsRepository,
...__LEGACY,
})
);
this.legacyTaskManager$.complete();
});
return this.taskManager;
}),
addMiddleware: (middleware: Middleware) => {
this.taskManager.then(tm => tm.addMiddleware(middleware));
},
registerTaskDefinitions: (taskDefinition: TaskDictionary<TaskDefinition>) => {
this.taskManager.then(tm => tm.registerTaskDefinitions(taskDefinition));
},
};
}
public start(): TaskManagerStartContract {
return {
fetch: (...args) => this.taskManager.then(tm => tm.fetch(...args)),
remove: (...args) => this.taskManager.then(tm => tm.remove(...args)),
schedule: (...args) => this.taskManager.then(tm => tm.schedule(...args)),
runNow: (...args) => this.taskManager.then(tm => tm.runNow(...args)),
ensureScheduled: (...args) => this.taskManager.then(tm => tm.ensureScheduled(...args)),
};
}
public stop() {
this.taskManager.then(tm => {
tm.stop();
});
}
}

View file

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { TaskManagerSetupContract, TaskManagerStartContract } from './plugin';
import { Subject } from 'rxjs';
export const taskManagerMock = {
setup(overrides: Partial<jest.Mocked<TaskManagerSetupContract>> = {}) {
const mocked: jest.Mocked<TaskManagerSetupContract> = {
registerTaskDefinitions: jest.fn(),
addMiddleware: jest.fn(),
config$: new Subject(),
registerLegacyAPI: jest.fn(),
...overrides,
};
return mocked;
},
start(overrides: Partial<jest.Mocked<TaskManagerStartContract>> = {}) {
const mocked: jest.Mocked<TaskManagerStartContract> = {
ensureScheduled: jest.fn(),
schedule: jest.fn(),
fetch: jest.fn(),
runNow: jest.fn(),
remove: jest.fn(),
...overrides,
};
return mocked;
},
};

View file

@ -20,39 +20,33 @@ import {
awaitTaskRunResult,
TaskLifecycleEvent,
} from './task_manager';
import { savedObjectsClientMock } from '../../../../../src/core/server/mocks';
import { SavedObjectsSerializer, SavedObjectsSchema } from '../../../../../src/core/server';
import { savedObjectsRepositoryMock } from '../../../../src/core/server/mocks';
import { SavedObjectsSerializer, SavedObjectsSchema } from '../../../../src/core/server';
import { mockLogger } from './test_utils';
import { asErr, asOk } from './lib/result_type';
import { ConcreteTaskInstance, TaskLifecycleResult, TaskStatus } from './task';
const savedObjectsClient = savedObjectsClientMock.create();
const savedObjectsClient = savedObjectsRepositoryMock.create();
const serializer = new SavedObjectsSerializer(new SavedObjectsSchema());
describe('TaskManager', () => {
let clock: sinon.SinonFakeTimers;
const defaultConfig = {
xpack: {
task_manager: {
max_workers: 10,
index: 'foo',
max_attempts: 9,
poll_interval: 6000000,
},
},
server: {
uuid: 'some-uuid',
},
};
const config = {
get: (path: string) => _.get(defaultConfig, path),
enabled: true,
max_workers: 10,
index: 'foo',
max_attempts: 9,
poll_interval: 6000000,
request_capacity: 1000,
};
const taskManagerOpts = {
config,
savedObjectsRepository: savedObjectsClient,
serializer,
callWithInternalUser: jest.fn(),
callAsInternalUser: jest.fn(),
logger: mockLogger(),
taskManagerId: 'some-uuid',
};
beforeEach(() => {
@ -63,21 +57,9 @@ describe('TaskManager', () => {
test('throws if no valid UUID is available', async () => {
expect(() => {
const configWithoutServerUUID = {
xpack: {
task_manager: {
max_workers: 10,
index: 'foo',
max_attempts: 9,
poll_interval: 6000000,
},
},
};
new TaskManager({
...taskManagerOpts,
config: {
get: (path: string) => _.get(configWithoutServerUUID, path),
},
taskManagerId: '',
});
}).toThrowErrorMatchingInlineSnapshot(
`"TaskManager is unable to start as Kibana has no valid UUID assigned to it."`
@ -234,7 +216,7 @@ describe('TaskManager', () => {
test('allows and queues fetching tasks before starting', async () => {
const client = new TaskManager(taskManagerOpts);
taskManagerOpts.callWithInternalUser.mockResolvedValue({
taskManagerOpts.callAsInternalUser.mockResolvedValue({
hits: {
total: {
value: 0,
@ -245,13 +227,13 @@ describe('TaskManager', () => {
const promise = client.fetch({});
client.start();
await promise;
expect(taskManagerOpts.callWithInternalUser).toHaveBeenCalled();
expect(taskManagerOpts.callAsInternalUser).toHaveBeenCalled();
});
test('allows fetching tasks after starting', async () => {
const client = new TaskManager(taskManagerOpts);
client.start();
taskManagerOpts.callWithInternalUser.mockResolvedValue({
taskManagerOpts.callAsInternalUser.mockResolvedValue({
hits: {
total: {
value: 0,
@ -260,7 +242,7 @@ describe('TaskManager', () => {
},
});
await client.fetch({});
expect(taskManagerOpts.callWithInternalUser).toHaveBeenCalled();
expect(taskManagerOpts.callAsInternalUser).toHaveBeenCalled();
});
test('allows middleware registration before starting', () => {
@ -282,7 +264,6 @@ describe('TaskManager', () => {
};
client.start();
expect(() => client.addMiddleware(middleware)).toThrow(
/Cannot add middleware after the task manager is initialized/i
);

View file

@ -10,8 +10,13 @@ import { performance } from 'perf_hooks';
import { pipe } from 'fp-ts/lib/pipeable';
import { Option, none, some, map as mapOptional } from 'fp-ts/lib/Option';
import { SavedObjectsClientContract, SavedObjectsSerializer } from '../../../../../src/core/server';
import {
SavedObjectsSerializer,
IScopedClusterClient,
ISavedObjectsRepository,
} from '../../../../src/core/server';
import { Result, asErr, either, map, mapErr, promiseResult } from './lib/result_type';
import { TaskManagerConfig } from './config';
import { Logger } from './types';
import {
@ -56,10 +61,11 @@ const VERSION_CONFLICT_STATUS = 409;
export interface TaskManagerOpts {
logger: Logger;
config: any;
callWithInternalUser: any;
savedObjectsRepository: SavedObjectsClientContract;
config: TaskManagerConfig;
callAsInternalUser: IScopedClusterClient['callAsInternalUser'];
savedObjectsRepository: ISavedObjectsRepository;
serializer: SavedObjectsSerializer;
taskManagerId: string;
}
interface RunNowResult {
@ -110,7 +116,7 @@ export class TaskManager {
constructor(opts: TaskManagerOpts) {
this.logger = opts.logger;
const taskManagerId = opts.config.get('server.uuid');
const { taskManagerId } = opts;
if (!taskManagerId) {
this.logger.error(
`TaskManager is unable to start as there the Kibana UUID is invalid (value of the "server.uuid" configuration is ${taskManagerId})`
@ -123,9 +129,9 @@ export class TaskManager {
this.store = new TaskStore({
serializer: opts.serializer,
savedObjectsRepository: opts.savedObjectsRepository,
callCluster: opts.callWithInternalUser,
index: opts.config.get('xpack.task_manager.index'),
maxAttempts: opts.config.get('xpack.task_manager.max_attempts'),
callCluster: opts.callAsInternalUser,
index: opts.config.index,
maxAttempts: opts.config.max_attempts,
definitions: this.definitions,
taskManagerId: `kibana:${taskManagerId}`,
});
@ -134,12 +140,12 @@ export class TaskManager {
this.pool = new TaskPool({
logger: this.logger,
maxWorkers: opts.config.get('xpack.task_manager.max_workers'),
maxWorkers: opts.config.max_workers,
});
this.poller$ = createTaskPoller<string, FillPoolResult>({
pollInterval: opts.config.get('xpack.task_manager.poll_interval'),
bufferCapacity: opts.config.get('xpack.task_manager.request_capacity'),
pollInterval: opts.config.poll_interval,
bufferCapacity: opts.config.request_capacity,
getCapacity: () => this.pool.availableWorkers,
pollRequests$: this.claimRequests$,
work: this.pollForWork,

View file

@ -12,7 +12,7 @@ import { TaskEvent, asTaskRunEvent, asTaskMarkRunningEvent } from './task_events
import { ConcreteTaskInstance, TaskStatus } from './task';
import { TaskManagerRunner } from './task_runner';
import { mockLogger } from './test_utils';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
import { SavedObjectsErrorHelpers } from '../../../../src/core/server';
let fakeTimer: sinon.SinonFakeTimers;

View file

@ -17,13 +17,13 @@ import {
TaskLifecycleResult,
} from './task';
import { FetchOpts, StoreOpts, OwnershipClaimingOpts, TaskStore } from './task_store';
import { savedObjectsClientMock } from '../../../../../src/core/server/mocks';
import { savedObjectsRepositoryMock } from '../../../../src/core/server/mocks';
import {
SavedObjectsSerializer,
SavedObjectsSchema,
SavedObjectAttributes,
} from '../../../../../src/core/server';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server/saved_objects/service/lib/errors';
} from '../../../../src/core/server';
import { SavedObjectsErrorHelpers } from '../../../../src/core/server/saved_objects/service/lib/errors';
import { asTaskClaimEvent, TaskEvent } from './task_events';
import { asOk, asErr } from './lib/result_type';
@ -45,7 +45,7 @@ const taskDefinitions: TaskDictionary<TaskDefinition> = {
},
};
const savedObjectsClient = savedObjectsClientMock.create();
const savedObjectsClient = savedObjectsRepositoryMock.create();
const serializer = new SavedObjectsSerializer(new SavedObjectsSchema());
beforeEach(() => jest.resetAllMocks());

View file

@ -11,12 +11,12 @@ import { Subject, Observable } from 'rxjs';
import { omit, difference } from 'lodash';
import {
SavedObjectsClientContract,
SavedObject,
SavedObjectAttributes,
SavedObjectsSerializer,
SavedObjectsRawDoc,
} from '../../../../../src/core/server';
ISavedObjectsRepository,
} from '../../../../src/core/server';
import { asOk, asErr } from './lib/result_type';
@ -60,7 +60,7 @@ export interface StoreOpts {
taskManagerId: string;
maxAttempts: number;
definitions: TaskDictionary<TaskDefinition>;
savedObjectsRepository: SavedObjectsClientContract;
savedObjectsRepository: ISavedObjectsRepository;
serializer: SavedObjectsSerializer;
}
@ -123,7 +123,7 @@ export class TaskStore {
private callCluster: ElasticJs;
private definitions: TaskDictionary<TaskDefinition>;
private savedObjectsRepository: SavedObjectsClientContract;
private savedObjectsRepository: ISavedObjectsRepository;
private serializer: SavedObjectsSerializer;
private events$: Subject<TaskClaim>;

View file

@ -4,6 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { TaskManagerStartContract } from '../../../../../../plugins/task_manager/server';
const taskManagerQuery = (...filters: any[]) => ({
bool: {
filter: {
@ -38,7 +40,7 @@ export default function(kibana: any) {
},
init(server: any) {
const taskManager = server.plugins.task_manager;
const taskManager = server.newPlatform.start.plugins.taskManager as TaskManagerStartContract;
server.route({
path: '/api/alerting_tasks/{taskId}',

View file

@ -28,7 +28,11 @@ export default function TaskTestingAPI(kibana) {
},
init(server) {
const taskManager = server.plugins.task_manager;
const taskManager = {
...server.newPlatform.setup.plugins.taskManager,
...server.newPlatform.start.plugins.taskManager,
};
const legacyTaskManager = server.plugins.task_manager;
const defaultSampleTaskConfig = {
timeout: '1m',
@ -128,7 +132,7 @@ export default function TaskTestingAPI(kibana) {
},
});
initRoutes(server, taskTestingEvents);
initRoutes(server, taskManager, legacyTaskManager, taskTestingEvents);
},
});
}

View file

@ -23,9 +23,7 @@ const taskManagerQuery = {
},
};
export function initRoutes(server, taskTestingEvents) {
const taskManager = server.plugins.task_manager;
export function initRoutes(server, taskManager, legacyTaskManager, taskTestingEvents) {
server.route({
path: '/api/sample_tasks/schedule',
method: 'POST',
@ -62,6 +60,45 @@ export function initRoutes(server, taskTestingEvents) {
},
});
/*
Schedule using legacy Api
*/
server.route({
path: '/api/sample_tasks/schedule_legacy',
method: 'POST',
config: {
validate: {
payload: Joi.object({
task: Joi.object({
taskType: Joi.string().required(),
schedule: Joi.object({
interval: Joi.string(),
}).optional(),
interval: Joi.string().optional(),
params: Joi.object().required(),
state: Joi.object().optional(),
id: Joi.string().optional(),
}),
}),
},
},
async handler(request) {
try {
const { task: taskFields } = request.payload;
const task = {
...taskFields,
scope: [scope],
};
const taskResult = await legacyTaskManager.schedule(task, { request });
return taskResult;
} catch (err) {
return err;
}
},
});
server.route({
path: '/api/sample_tasks/run_now',
method: 'POST',

View file

@ -74,6 +74,15 @@ export default function({ getService }) {
.then(response => response.body);
}
function scheduleTaskUsingLegacyApi(task) {
return supertest
.post('/api/sample_tasks/schedule_legacy')
.set('kbn-xsrf', 'xxx')
.send({ task })
.expect(200)
.then(response => response.body);
}
function runTaskNow(task) {
return supertest
.post('/api/sample_tasks/run_now')
@ -494,5 +503,15 @@ export default function({ getService }) {
expect(getTaskById(tasks, longRunningTask.id).state.count).to.eql(1);
});
});
it('should retain the legacy api until v8.0.0', async () => {
const result = await scheduleTaskUsingLegacyApi({
id: 'task-with-legacy-api',
taskType: 'sampleTask',
params: {},
});
expect(result.id).to.be('task-with-legacy-api');
});
});
}

View file

@ -23,7 +23,10 @@ export default function TaskManagerPerformanceAPI(kibana) {
},
init(server) {
const taskManager = server.plugins.task_manager;
const taskManager = {
...server.newPlatform.setup.plugins.taskManager,
...server.newPlatform.start.plugins.taskManager,
};
const performanceState = resetPerfState({});
let lastFlush = new Date();

View file

@ -9,7 +9,10 @@ import { range, chunk } from 'lodash';
const scope = 'perf-testing';
export function initRoutes(server, performanceState) {
const taskManager = server.plugins.task_manager;
const taskManager = {
...server.newPlatform.setup.plugins.taskManager,
...server.newPlatform.start.plugins.taskManager,
};
server.route({
path: '/api/perf_tasks',

View file

@ -9,7 +9,6 @@ import 'hapi';
import { XPackMainPlugin } from '../../legacy/plugins/xpack_main/server/xpack_main';
import { SecurityPlugin } from '../../legacy/plugins/security';
import { ActionsPlugin, ActionsClient } from '../../legacy/plugins/actions';
import { TaskManager } from '../../legacy/plugins/task_manager/server';
import { AlertingPlugin, AlertsClient } from '../../legacy/plugins/alerting';
declare module 'hapi' {
@ -22,6 +21,5 @@ declare module 'hapi' {
security?: SecurityPlugin;
actions?: ActionsPlugin;
alerting?: AlertingPlugin;
task_manager?: TaskManager;
}
}

View file

@ -9,8 +9,8 @@ import 'hapi';
import { XPackMainPlugin } from '../legacy/plugins/xpack_main/server/xpack_main';
import { SecurityPlugin } from '../legacy/plugins/security';
import { ActionsPlugin, ActionsClient } from '../legacy/plugins/actions';
import { TaskManager } from '../legacy/plugins/task_manager/server';
import { AlertingPlugin, AlertsClient } from '../legacy/plugins/alerting';
import { LegacyTaskManagerApi } from '../legacy/plugins/task_manager/server';
declare module 'hapi' {
interface Request {
@ -22,6 +22,6 @@ declare module 'hapi' {
security?: SecurityPlugin;
actions?: ActionsPlugin;
alerting?: AlertingPlugin;
task_manager?: TaskManager;
task_manager?: LegacyTaskManagerApi;
}
}