mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
[Task Manager] Add caching to the task partitioning logic (#189562)
Resolves https://github.com/elastic/kibana/issues/189119 ## Summary This PR adds a mechanism to keep the node's calculated partitions in cache for 10 seconds before calling the discovery service again and recalculating them. ### Checklist - [ ] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios ### To verify - Add the following to kibana.yml: ``` xpack.task_manager.claim_strategy: 'unsafe_mget' ``` - Verify that the cache is being updated every 10 seconds, you could test this by adding a log statement with the timestamp in the TaskPartitioner code that was updated - Verify that on start the cache is updated on the initial claiming cycle
This commit is contained in:
parent
8cffec5483
commit
d79bdfd915
4 changed files with 85 additions and 25 deletions
|
@ -9,7 +9,7 @@ import {
|
|||
createDiscoveryServiceMock,
|
||||
createFindSO,
|
||||
} from '../kibana_discovery_service/mock_kibana_discovery_service';
|
||||
import { TaskPartitioner } from './task_partitioner';
|
||||
import { CACHE_INTERVAL, TaskPartitioner } from './task_partitioner';
|
||||
|
||||
const POD_NAME = 'test-pod';
|
||||
|
||||
|
@ -47,24 +47,61 @@ describe('getPodName()', () => {
|
|||
describe('getPartitions()', () => {
|
||||
const lastSeen = '2024-08-10T10:00:00.000Z';
|
||||
const discoveryServiceMock = createDiscoveryServiceMock(POD_NAME);
|
||||
discoveryServiceMock.getActiveKibanaNodes.mockResolvedValue([
|
||||
createFindSO(POD_NAME, lastSeen),
|
||||
createFindSO('test-pod-2', lastSeen),
|
||||
createFindSO('test-pod-3', lastSeen),
|
||||
]);
|
||||
const expectedPartitions = [
|
||||
0, 1, 3, 4, 6, 7, 9, 10, 12, 13, 15, 16, 18, 19, 21, 22, 24, 25, 27, 28, 30, 31, 33, 34, 36, 37,
|
||||
39, 40, 42, 43, 45, 46, 48, 49, 51, 52, 54, 55, 57, 58, 60, 61, 63, 64, 66, 67, 69, 70, 72, 73,
|
||||
75, 76, 78, 79, 81, 82, 84, 85, 87, 88, 90, 91, 93, 94, 96, 97, 99, 100, 102, 103, 105, 106,
|
||||
108, 109, 111, 112, 114, 115, 117, 118, 120, 121, 123, 124, 126, 127, 129, 130, 132, 133, 135,
|
||||
136, 138, 139, 141, 142, 144, 145, 147, 148, 150, 151, 153, 154, 156, 157, 159, 160, 162, 163,
|
||||
165, 166, 168, 169, 171, 172, 174, 175, 177, 178, 180, 181, 183, 184, 186, 187, 189, 190, 192,
|
||||
193, 195, 196, 198, 199, 201, 202, 204, 205, 207, 208, 210, 211, 213, 214, 216, 217, 219, 220,
|
||||
222, 223, 225, 226, 228, 229, 231, 232, 234, 235, 237, 238, 240, 241, 243, 244, 246, 247, 249,
|
||||
250, 252, 253, 255,
|
||||
];
|
||||
|
||||
beforeEach(() => {
|
||||
jest.useFakeTimers();
|
||||
discoveryServiceMock.getActiveKibanaNodes.mockResolvedValue([
|
||||
createFindSO(POD_NAME, lastSeen),
|
||||
createFindSO('test-pod-2', lastSeen),
|
||||
createFindSO('test-pod-3', lastSeen),
|
||||
]);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.clearAllMocks();
|
||||
jest.clearAllTimers();
|
||||
});
|
||||
|
||||
test('correctly gets the partitons for this pod', async () => {
|
||||
const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock);
|
||||
expect(await taskPartitioner.getPartitions()).toEqual([
|
||||
0, 1, 3, 4, 6, 7, 9, 10, 12, 13, 15, 16, 18, 19, 21, 22, 24, 25, 27, 28, 30, 31, 33, 34, 36,
|
||||
37, 39, 40, 42, 43, 45, 46, 48, 49, 51, 52, 54, 55, 57, 58, 60, 61, 63, 64, 66, 67, 69, 70,
|
||||
72, 73, 75, 76, 78, 79, 81, 82, 84, 85, 87, 88, 90, 91, 93, 94, 96, 97, 99, 100, 102, 103,
|
||||
105, 106, 108, 109, 111, 112, 114, 115, 117, 118, 120, 121, 123, 124, 126, 127, 129, 130, 132,
|
||||
133, 135, 136, 138, 139, 141, 142, 144, 145, 147, 148, 150, 151, 153, 154, 156, 157, 159, 160,
|
||||
162, 163, 165, 166, 168, 169, 171, 172, 174, 175, 177, 178, 180, 181, 183, 184, 186, 187, 189,
|
||||
190, 192, 193, 195, 196, 198, 199, 201, 202, 204, 205, 207, 208, 210, 211, 213, 214, 216, 217,
|
||||
219, 220, 222, 223, 225, 226, 228, 229, 231, 232, 234, 235, 237, 238, 240, 241, 243, 244, 246,
|
||||
247, 249, 250, 252, 253, 255,
|
||||
]);
|
||||
expect(await taskPartitioner.getPartitions()).toEqual(expectedPartitions);
|
||||
});
|
||||
|
||||
test('correctly caches the partitions on 10 second interval', async () => {
|
||||
const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock);
|
||||
const shorterInterval = CACHE_INTERVAL / 2;
|
||||
|
||||
await taskPartitioner.getPartitions();
|
||||
|
||||
jest.advanceTimersByTime(shorterInterval);
|
||||
await taskPartitioner.getPartitions();
|
||||
|
||||
jest.advanceTimersByTime(shorterInterval);
|
||||
await taskPartitioner.getPartitions();
|
||||
|
||||
expect(discoveryServiceMock.getActiveKibanaNodes).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
test('correctly catches the error from the discovery service and returns the cached value', async () => {
|
||||
const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock);
|
||||
|
||||
await taskPartitioner.getPartitions();
|
||||
expect(taskPartitioner.getPodPartitions()).toEqual(expectedPartitions);
|
||||
|
||||
discoveryServiceMock.getActiveKibanaNodes.mockRejectedValueOnce([]);
|
||||
jest.advanceTimersByTime(CACHE_INTERVAL);
|
||||
await taskPartitioner.getPartitions();
|
||||
expect(taskPartitioner.getPodPartitions()).toEqual(expectedPartitions);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -17,16 +17,21 @@ function range(start: number, end: number) {
|
|||
}
|
||||
|
||||
export const MAX_PARTITIONS = 256;
|
||||
export const CACHE_INTERVAL = 10000;
|
||||
|
||||
export class TaskPartitioner {
|
||||
private readonly allPartitions: number[];
|
||||
private readonly podName: string;
|
||||
private kibanaDiscoveryService: KibanaDiscoveryService;
|
||||
private podPartitions: number[];
|
||||
private podPartitionsLastUpdated: number;
|
||||
|
||||
constructor(podName: string, kibanaDiscoveryService: KibanaDiscoveryService) {
|
||||
this.allPartitions = range(0, MAX_PARTITIONS);
|
||||
this.podName = podName;
|
||||
this.kibanaDiscoveryService = kibanaDiscoveryService;
|
||||
this.podPartitions = [];
|
||||
this.podPartitionsLastUpdated = Date.now() - CACHE_INTERVAL;
|
||||
}
|
||||
|
||||
getAllPartitions(): number[] {
|
||||
|
@ -37,10 +42,26 @@ export class TaskPartitioner {
|
|||
return this.podName;
|
||||
}
|
||||
|
||||
getPodPartitions(): number[] {
|
||||
return this.podPartitions;
|
||||
}
|
||||
|
||||
async getPartitions(): Promise<number[]> {
|
||||
const allPodNames = await this.getAllPodNames();
|
||||
const podPartitions = assignPodPartitions(this.podName, allPodNames, this.allPartitions);
|
||||
return podPartitions;
|
||||
const lastUpdated = new Date(this.podPartitionsLastUpdated).getTime();
|
||||
const now = Date.now();
|
||||
|
||||
// update the pod partitions cache after 10 seconds
|
||||
if (now - lastUpdated >= CACHE_INTERVAL) {
|
||||
try {
|
||||
const allPodNames = await this.getAllPodNames();
|
||||
this.podPartitions = assignPodPartitions(this.podName, allPodNames, this.allPartitions);
|
||||
this.podPartitionsLastUpdated = now;
|
||||
} catch (error) {
|
||||
// return the cached value
|
||||
return this.podPartitions;
|
||||
}
|
||||
}
|
||||
return this.podPartitions;
|
||||
}
|
||||
|
||||
private async getAllPodNames(): Promise<string[]> {
|
||||
|
|
|
@ -40,10 +40,6 @@ import {
|
|||
createFindSO,
|
||||
} from '../kibana_discovery_service/mock_kibana_discovery_service';
|
||||
|
||||
jest.mock('../lib/assign_pod_partitions', () => ({
|
||||
assignPodPartitions: jest.fn().mockReturnValue([1, 3]),
|
||||
}));
|
||||
|
||||
jest.mock('../constants', () => ({
|
||||
CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: [
|
||||
'limitedToZero',
|
||||
|
@ -107,6 +103,7 @@ describe('TaskClaiming', () => {
|
|||
.spyOn(apm, 'startTransaction')
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
.mockImplementation(() => mockApmTrans as any);
|
||||
jest.spyOn(taskPartitioner, 'getPartitions').mockResolvedValue([1, 3]);
|
||||
});
|
||||
|
||||
describe('claimAvailableTasks', () => {
|
||||
|
|
|
@ -10,6 +10,7 @@ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
|||
import { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server';
|
||||
import { taskMappings as TaskManagerMapping } from '@kbn/task-manager-plugin/server/saved_objects/mappings';
|
||||
import { asyncForEach } from '@kbn/std';
|
||||
import { setTimeout as setTimeoutAsync } from 'timers/promises';
|
||||
import { FtrProviderContext } from '../../ftr_provider_context';
|
||||
|
||||
const { properties: taskManagerIndexMapping } = TaskManagerMapping;
|
||||
|
@ -154,13 +155,17 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
});
|
||||
});
|
||||
|
||||
it('should tasks with partitions assigned to this kibana node', async () => {
|
||||
it('should run tasks with partitions assigned to this kibana node', async () => {
|
||||
const partitions: Record<string, number> = {
|
||||
'0': 127,
|
||||
'1': 147,
|
||||
'2': 23,
|
||||
};
|
||||
|
||||
// wait for the pod partitions cache to update before scheduling tasks
|
||||
await updateKibanaNodes();
|
||||
await setTimeoutAsync(10000);
|
||||
|
||||
const tasksToSchedule = [];
|
||||
for (let i = 0; i < 3; i++) {
|
||||
tasksToSchedule.push(
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue