Task manager to queue calls that require it to be initialized (#43589) (#43619)

* Initial work

* Add unit tests
This commit is contained in:
Mike Côté 2019-08-20 18:16:06 -04:00 committed by GitHub
parent 855b797376
commit 5a35586f46
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 114 additions and 32 deletions

View file

@ -17,12 +17,14 @@ const serializer = new SavedObjectsSerializer(new SavedObjectsSchema());
describe('TaskManager', () => {
let clock: sinon.SinonFakeTimers;
const defaultConfig = {
task_manager: {
max_workers: 10,
override_num_workers: {},
index: 'foo',
max_attempts: 9,
poll_interval: 6000000,
xpack: {
task_manager: {
max_workers: 10,
override_num_workers: {},
index: 'foo',
max_attempts: 9,
poll_interval: 6000000,
},
},
};
const config = {
@ -42,27 +44,106 @@ describe('TaskManager', () => {
afterEach(() => clock.restore());
test('disallows schedule before init', async () => {
test('allows and queues scheduling tasks before starting', async () => {
const client = new TaskManager(taskManagerOpts);
client.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
const task = {
taskType: 'foo',
params: {},
state: {},
};
await expect(client.schedule(task)).rejects.toThrow(/^NotInitialized: .*/i);
savedObjectsClient.create.mockResolvedValueOnce({
id: '1',
type: 'task',
attributes: {},
references: [],
});
const promise = client.schedule(task);
client.start();
await promise;
expect(savedObjectsClient.create).toHaveBeenCalled();
});
test('disallows fetch before init', async () => {
test('allows scheduling tasks after starting', async () => {
const client = new TaskManager(taskManagerOpts);
await expect(client.fetch({})).rejects.toThrow(/^NotInitialized: .*/i);
client.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
client.start();
const task = {
taskType: 'foo',
params: {},
state: {},
};
savedObjectsClient.create.mockResolvedValueOnce({
id: '1',
type: 'task',
attributes: {},
references: [],
});
await client.schedule(task);
expect(savedObjectsClient.create).toHaveBeenCalled();
});
test('disallows remove before init', async () => {
test('allows and queues removing tasks before starting', async () => {
const client = new TaskManager(taskManagerOpts);
await expect(client.remove('23')).rejects.toThrow(/^NotInitialized: .*/i);
savedObjectsClient.delete.mockResolvedValueOnce({});
const promise = client.remove('1');
client.start();
await promise;
expect(savedObjectsClient.delete).toHaveBeenCalled();
});
test('allows middleware registration before init', () => {
test('allows removing tasks after starting', async () => {
const client = new TaskManager(taskManagerOpts);
client.start();
savedObjectsClient.delete.mockResolvedValueOnce({});
await client.remove('1');
expect(savedObjectsClient.delete).toHaveBeenCalled();
});
test('allows and queues fetching tasks before starting', async () => {
const client = new TaskManager(taskManagerOpts);
taskManagerOpts.callWithInternalUser.mockResolvedValue({
hits: {
total: {
value: 0,
},
hits: [],
},
});
const promise = client.fetch({});
client.start();
await promise;
expect(taskManagerOpts.callWithInternalUser).toHaveBeenCalled();
});
test('allows fetching tasks after starting', async () => {
const client = new TaskManager(taskManagerOpts);
client.start();
taskManagerOpts.callWithInternalUser.mockResolvedValue({
hits: {
total: {
value: 0,
},
hits: [],
},
});
await client.fetch({});
expect(taskManagerOpts.callWithInternalUser).toHaveBeenCalled();
});
test('allows middleware registration before starting', () => {
const client = new TaskManager(taskManagerOpts);
const middleware = {
beforeSave: async (saveOpts: any) => saveOpts,
@ -71,7 +152,7 @@ describe('TaskManager', () => {
expect(() => client.addMiddleware(middleware)).not.toThrow();
});
test('disallows middleware registration after init', async () => {
test('disallows middleware registration after starting', async () => {
const client = new TaskManager(taskManagerOpts);
const middleware = {
beforeSave: async (saveOpts: any) => saveOpts,

View file

@ -38,7 +38,7 @@ export interface TaskManagerOpts {
* The public interface into the task manager system.
*/
export class TaskManager {
private isInitialized = false;
private isStarted = false;
private maxWorkers: number;
private overrideNumWorkers: { [taskType: string]: number };
private readonly pollerInterval: number;
@ -47,6 +47,7 @@ export class TaskManager {
private poller: TaskPoller;
private logger: Logger;
private pool: TaskPool;
private startQueue: Array<() => void> = [];
private middleware = {
beforeSave: async (saveOpts: BeforeSaveMiddlewareParams) => saveOpts,
beforeRun: async (runOpts: RunContext) => runOpts,
@ -103,7 +104,10 @@ export class TaskManager {
* Starts up the task manager and starts picking up tasks.
*/
public start() {
this.isInitialized = true;
this.isStarted = true;
// Some calls are waiting until task manager is started
this.startQueue.forEach(fn => fn());
this.startQueue = [];
const startPoller = async () => {
try {
await this.poller.start();
@ -120,10 +124,19 @@ export class TaskManager {
startPoller();
}
private async waitUntilStarted() {
if (!this.isStarted) {
await new Promise(resolve => {
this.startQueue.push(resolve);
});
}
}
/**
* Stops the task manager and cancels running tasks.
*/
public stop() {
this.poller.stop();
this.pool.cancelRunningTasks();
}
@ -169,7 +182,7 @@ export class TaskManager {
* @returns {Promise<ConcreteTaskInstance>}
*/
public async schedule(taskInstance: TaskInstance, options?: any): Promise<ConcreteTaskInstance> {
this.assertInitialized('Tasks cannot be scheduled until after task manager is initialized!');
await this.waitUntilStarted();
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
...options,
taskInstance,
@ -186,7 +199,7 @@ export class TaskManager {
* @returns {Promise<FetchResult>}
*/
public async fetch(opts: FetchOpts): Promise<FetchResult> {
this.assertInitialized('Tasks cannot be fetched before task manager is initialized!');
await this.waitUntilStarted();
return this.store.fetch(opts);
}
@ -197,7 +210,7 @@ export class TaskManager {
* @returns {Promise<RemoveResult>}
*/
public async remove(id: string): Promise<void> {
this.assertInitialized('Tasks cannot be removed before task manager is initialized!');
await this.waitUntilStarted();
return this.store.remove(id);
}
@ -208,20 +221,8 @@ export class TaskManager {
* @returns void
*/
private assertUninitialized(message: string) {
if (this.isInitialized) {
if (this.isStarted) {
throw new Error(`Cannot ${message} after the task manager is initialized!`);
}
}
/**
* Ensures task manager IS already initialized
*
* @param {string} message shown if task manager is not initialized
* @returns void
*/
private assertInitialized(message: string) {
if (!this.isInitialized) {
throw new Error(`NotInitialized: ${message}`);
}
}
}