mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
[Task Manager] Tests for the ability to run tasks of varying durations in parallel (#51572)
This PR adds a test that ensures Task Manager is capable of picking up new tasks in parallel to a long running tasks that might otherwise hold up task execution. This doesn't add functionality - just a missing test case.
This commit is contained in:
parent
7fef618ea6
commit
cfed9c6c48
3 changed files with 125 additions and 16 deletions
|
@ -4,9 +4,20 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
const { EventEmitter } = require('events');
|
||||
|
||||
import { initRoutes } from './init_routes';
|
||||
|
||||
|
||||
const once = function (emitter, event) {
|
||||
return new Promise(resolve => {
|
||||
emitter.once(event, resolve);
|
||||
});
|
||||
};
|
||||
|
||||
export default function TaskTestingAPI(kibana) {
|
||||
const taskTestingEvents = new EventEmitter();
|
||||
|
||||
return new kibana.Plugin({
|
||||
name: 'sampleTask',
|
||||
require: ['elasticsearch', 'task_manager'],
|
||||
|
@ -52,6 +63,10 @@ export default function TaskTestingAPI(kibana) {
|
|||
refresh: true,
|
||||
});
|
||||
|
||||
if (params.waitForEvent) {
|
||||
await once(taskTestingEvents, params.waitForEvent);
|
||||
}
|
||||
|
||||
return {
|
||||
state: { count: (prevState.count || 0) + 1 },
|
||||
runAt: millisecondsFromNow(params.nextRunMilliseconds),
|
||||
|
@ -88,7 +103,7 @@ export default function TaskTestingAPI(kibana) {
|
|||
},
|
||||
});
|
||||
|
||||
initRoutes(server);
|
||||
initRoutes(server, taskTestingEvents);
|
||||
},
|
||||
});
|
||||
}
|
||||
|
|
|
@ -23,11 +23,11 @@ const taskManagerQuery = {
|
|||
}
|
||||
};
|
||||
|
||||
export function initRoutes(server) {
|
||||
export function initRoutes(server, taskTestingEvents) {
|
||||
const taskManager = server.plugins.task_manager;
|
||||
|
||||
server.route({
|
||||
path: '/api/sample_tasks',
|
||||
path: '/api/sample_tasks/schedule',
|
||||
method: 'POST',
|
||||
config: {
|
||||
validate: {
|
||||
|
@ -38,26 +38,19 @@ export function initRoutes(server) {
|
|||
params: Joi.object().required(),
|
||||
state: Joi.object().optional(),
|
||||
id: Joi.string().optional()
|
||||
}),
|
||||
ensureScheduled: Joi.boolean()
|
||||
.default(false)
|
||||
.optional(),
|
||||
})
|
||||
}),
|
||||
},
|
||||
},
|
||||
async handler(request) {
|
||||
try {
|
||||
const { ensureScheduled = false, task: taskFields } = request.payload;
|
||||
const { task: taskFields } = request.payload;
|
||||
const task = {
|
||||
...taskFields,
|
||||
scope: [scope],
|
||||
};
|
||||
|
||||
const taskResult = await (
|
||||
ensureScheduled
|
||||
? taskManager.ensureScheduled(task, { request })
|
||||
: taskManager.schedule(task, { request })
|
||||
);
|
||||
const taskResult = await (taskManager.schedule(task, { request }));
|
||||
|
||||
return taskResult;
|
||||
} catch (err) {
|
||||
|
@ -66,6 +59,60 @@ export function initRoutes(server) {
|
|||
},
|
||||
});
|
||||
|
||||
server.route({
|
||||
path: '/api/sample_tasks/ensure_scheduled',
|
||||
method: 'POST',
|
||||
config: {
|
||||
validate: {
|
||||
payload: Joi.object({
|
||||
task: Joi.object({
|
||||
taskType: Joi.string().required(),
|
||||
interval: Joi.string().optional(),
|
||||
params: Joi.object().required(),
|
||||
state: Joi.object().optional(),
|
||||
id: Joi.string().optional()
|
||||
})
|
||||
}),
|
||||
},
|
||||
},
|
||||
async handler(request) {
|
||||
try {
|
||||
const { task: taskFields } = request.payload;
|
||||
const task = {
|
||||
...taskFields,
|
||||
scope: [scope],
|
||||
};
|
||||
|
||||
const taskResult = await (taskManager.ensureScheduled(task, { request }));
|
||||
|
||||
return taskResult;
|
||||
} catch (err) {
|
||||
return err;
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
server.route({
|
||||
path: '/api/sample_tasks/event',
|
||||
method: 'POST',
|
||||
config: {
|
||||
validate: {
|
||||
payload: Joi.object({
|
||||
event: Joi.string().required()
|
||||
}),
|
||||
},
|
||||
},
|
||||
async handler(request) {
|
||||
try {
|
||||
const { event } = request.payload;
|
||||
taskTestingEvents.emit(event);
|
||||
return { event };
|
||||
} catch (err) {
|
||||
return err;
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
server.route({
|
||||
path: '/api/sample_tasks',
|
||||
method: 'GET',
|
||||
|
|
|
@ -58,7 +58,7 @@ export default function ({ getService }) {
|
|||
}
|
||||
|
||||
function scheduleTask(task) {
|
||||
return supertest.post('/api/sample_tasks')
|
||||
return supertest.post('/api/sample_tasks/schedule')
|
||||
.set('kbn-xsrf', 'xxx')
|
||||
.send({ task })
|
||||
.expect(200)
|
||||
|
@ -66,13 +66,20 @@ export default function ({ getService }) {
|
|||
}
|
||||
|
||||
function scheduleTaskIfNotExists(task) {
|
||||
return supertest.post('/api/sample_tasks')
|
||||
return supertest.post('/api/sample_tasks/ensure_scheduled')
|
||||
.set('kbn-xsrf', 'xxx')
|
||||
.send({ task, ensureScheduled: true })
|
||||
.send({ task })
|
||||
.expect(200)
|
||||
.then((response) => response.body);
|
||||
}
|
||||
|
||||
function releaseTasksWaitingForEventToComplete(event) {
|
||||
return supertest.post('/api/sample_tasks/event')
|
||||
.set('kbn-xsrf', 'xxx')
|
||||
.send({ event })
|
||||
.expect(200);
|
||||
}
|
||||
|
||||
it('should support middleware', async () => {
|
||||
const historyItem = _.random(1, 100);
|
||||
|
||||
|
@ -204,5 +211,45 @@ export default function ({ getService }) {
|
|||
expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.greaterThan(expectedDiff - buffer);
|
||||
expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.lessThan(expectedDiff + buffer);
|
||||
}
|
||||
|
||||
it('should run tasks in parallel, allowing for long running tasks along side faster tasks', async () => {
|
||||
/**
|
||||
* It's worth noting this test relies on the /event endpoint that forces Task Manager to hold off
|
||||
* on completing a task until a call is made by the test suite.
|
||||
* If we begin testing with multiple Kibana instacnes in Parallel this will likely become flaky.
|
||||
* If you end up here because the test is flaky, this might be why.
|
||||
*/
|
||||
const fastTask = await scheduleTask({
|
||||
taskType: 'sampleTask',
|
||||
interval: `1s`,
|
||||
params: { },
|
||||
});
|
||||
|
||||
const longRunningTask = await scheduleTask({
|
||||
taskType: 'sampleTask',
|
||||
interval: `1s`,
|
||||
params: {
|
||||
waitForEvent: 'rescheduleHasHappened'
|
||||
},
|
||||
});
|
||||
|
||||
function getTaskById(tasks, id) {
|
||||
return tasks.filter(task => task.id === id)[0];
|
||||
}
|
||||
|
||||
await retry.try(async () => {
|
||||
const tasks = (await currentTasks()).docs;
|
||||
expect(getTaskById(tasks, fastTask.id).state.count).to.eql(2);
|
||||
});
|
||||
|
||||
await releaseTasksWaitingForEventToComplete('rescheduleHasHappened');
|
||||
|
||||
await retry.try(async () => {
|
||||
const tasks = (await currentTasks()).docs;
|
||||
|
||||
expect(getTaskById(tasks, fastTask.id).state.count).to.greaterThan(2);
|
||||
expect(getTaskById(tasks, longRunningTask.id).state.count).to.eql(1);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue