Fixes Failing test: Jest Integration Tests.x-pack/platform/plugins/shared/task_manager/server/integration_tests - capacity based claiming should claim tasks to full capacity (#201681)

Resolves https://github.com/elastic/kibana/issues/205949,
https://github.com/elastic/kibana/issues/191117

## Summary

Trying to fix flaky integration test by performing a bulk create for the
test tasks instead of creating one by one. After making this change, was
able to run the integration test ~100 times without failure.

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2025-02-05 16:20:38 -05:00 committed by GitHub
parent 6cab1dc6f8
commit 7f28ae63e3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 38 additions and 9 deletions

View file

@ -5,6 +5,6 @@
* 2.0.
*/
export { injectTask } from './inject_task';
export { injectTask, injectTaskBulk } from './inject_task';
export { setupTestServers } from './setup_test_servers';
export { retry } from './retry';

View file

@ -33,3 +33,27 @@ export async function injectTask(
},
});
}
export async function injectTaskBulk(esClient: ElasticsearchClient, tasks: ConcreteTaskInstance[]) {
const bulkRequest = [];
for (const task of tasks) {
bulkRequest.push({ create: { _id: `task:${task.id}` } });
bulkRequest.push({
references: [],
type: 'task',
updated_at: new Date().toISOString(),
task: {
...task,
state: JSON.stringify(task.state),
params: JSON.stringify(task.params),
runAt: task.runAt.toISOString(),
scheduledAt: task.scheduledAt.toISOString(),
partition: murmurhash.v3(task.id) % MAX_PARTITIONS,
},
});
}
await esClient.bulk({
index: '.kibana_task_manager',
body: bulkRequest,
});
}

View file

@ -12,7 +12,7 @@ import { times } from 'lodash';
import { TaskCost, TaskStatus } from '../task';
import type { TaskClaimingOpts } from '../queries/task_claiming';
import { TaskManagerPlugin, type TaskManagerStartContract } from '../plugin';
import { injectTask, setupTestServers, retry } from './lib';
import { injectTaskBulk, setupTestServers, retry } from './lib';
import { CreateMonitoringStatsOpts } from '../monitoring';
import { filter, map } from 'rxjs';
import { isTaskManagerWorkerUtilizationStatEvent } from '../task_events';
@ -94,8 +94,7 @@ jest.mock('../queries/task_claiming', () => {
const taskManagerStartSpy = jest.spyOn(TaskManagerPlugin.prototype, 'start');
// FLAKY: https://github.com/elastic/kibana/issues/191117
describe.skip('capacity based claiming', () => {
describe('capacity based claiming', () => {
const taskIdsToRemove: string[] = [];
let esServer: TestElasticsearchUtils;
let kibanaServer: TestKibanaUtils;
@ -170,9 +169,10 @@ describe.skip('capacity based claiming', () => {
times(10, () => ids.push(uuidV4()));
const now = new Date();
const runAt = new Date(now.valueOf() + 5000);
const runAt = new Date(now.valueOf() + 6000);
const tasks = [];
for (const id of ids) {
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
tasks.push({
id,
taskType: '_normalCostType',
params: {},
@ -190,6 +190,8 @@ describe.skip('capacity based claiming', () => {
taskIdsToRemove.push(id);
}
await injectTaskBulk(kibanaServer.coreStart.elasticsearch.client.asInternalUser, tasks);
await retry(async () => {
expect(mockTaskTypeNormalCostRunFn).toHaveBeenCalledTimes(10);
});
@ -235,8 +237,9 @@ describe.skip('capacity based claiming', () => {
const ids: string[] = [];
times(6, () => ids.push(uuidV4()));
const runAt1 = new Date(now.valueOf() - 5);
const tasks = [];
for (const id of ids) {
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
tasks.push({
id,
taskType: '_normalCostType',
params: {},
@ -257,7 +260,7 @@ describe.skip('capacity based claiming', () => {
// inject 1 XL cost task that will put us over the max cost capacity of 20
const xlid = uuidV4();
const runAt2 = now;
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
tasks.push({
id: xlid,
taskType: '_xlCostType',
params: {},
@ -277,7 +280,7 @@ describe.skip('capacity based claiming', () => {
// inject one more normal cost task
const runAt3 = new Date(now.valueOf() + 5);
const lastid = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
tasks.push({
id: lastid,
taskType: '_normalCostType',
params: {},
@ -294,6 +297,8 @@ describe.skip('capacity based claiming', () => {
});
taskIdsToRemove.push(lastid);
await injectTaskBulk(kibanaServer.coreStart.elasticsearch.client.asInternalUser, tasks);
// retry until all tasks have been run
await retry(async () => {
expect(mockTaskTypeNormalCostRunFn).toHaveBeenCalledTimes(7);