[Task Manager] Add partitions to tasks and assign those task partitions to Kibana nodes (#188758)

Resolves https://github.com/elastic/kibana/issues/187700
Resolves https://github.com/elastic/kibana/issues/187698

## Summary

This is a feature branch PR to main. Merging the following PRs that have
already been approved, https://github.com/elastic/kibana/pull/188001 and
https://github.com/elastic/kibana/pull/188368

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Alexi Doak 2024-07-19 11:46:50 -07:00 committed by GitHub
parent 2c8b2ff4a5
commit 5adf5be1c8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 959 additions and 57 deletions

View file

@ -1101,6 +1101,7 @@
"moment-timezone": "^0.5.43",
"monaco-editor": "^0.44.0",
"monaco-yaml": "^5.1.0",
"murmurhash": "^2.0.1",
"mustache": "^2.3.2",
"node-diff3": "^3.1.2",
"node-fetch": "^2.6.7",

View file

@ -1005,6 +1005,7 @@
"attempts",
"enabled",
"ownerId",
"partition",
"retryAt",
"runAt",
"schedule",

View file

@ -3327,6 +3327,9 @@
"ownerId": {
"type": "keyword"
},
"partition": {
"type": "integer"
},
"retryAt": {
"type": "date"
},

View file

@ -161,7 +161,7 @@ describe('checking migration metadata changes on all registered SO types', () =>
"synthetics-param": "3ebb744e5571de678b1312d5c418c8188002cf5e",
"synthetics-privates-locations": "f53d799d5c9bc8454aaa32c6abc99a899b025d5c",
"tag": "e2544392fe6563e215bb677abc8b01c2601ef2dc",
"task": "d17f2fc0bf6759a070c2221ec2787ad785c680fe",
"task": "3c89a7c918d5b896a5f8800f06e9114ad7e7aea3",
"telemetry": "7b00bcf1c7b4f6db1192bb7405a6a63e78b699fd",
"threshold-explorer-view": "175306806f9fc8e13fcc1c8953ec4ba89bda1b70",
"ui-metric": "d227284528fd19904e9d972aea0a13716fc5fe24",

View file

@ -11,46 +11,13 @@ import {
ACTIVE_NODES_LOOK_BACK,
} from './kibana_discovery_service';
import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects';
import {
SavedObjectsBulkDeleteResponse,
SavedObjectsFindResponse,
SavedObjectsFindResult,
SavedObjectsUpdateResponse,
} from '@kbn/core/server';
import { SavedObjectsBulkDeleteResponse, SavedObjectsUpdateResponse } from '@kbn/core/server';
import { BackgroundTaskNode } from '../saved_objects/schemas/background_task_node';
import { createFindResponse, createFindSO } from './mock_kibana_discovery_service';
const currentNode = 'current-node-id';
const now = '2024-08-10T10:00:00.000Z';
const createNodeRecord = (id: string = '1', lastSeen: string = now): BackgroundTaskNode => ({
id,
last_seen: lastSeen,
});
const createFindSO = (
id: string = currentNode,
lastSeen: string = now
): SavedObjectsFindResult<BackgroundTaskNode> => ({
attributes: createNodeRecord(id, lastSeen),
id: `${BACKGROUND_TASK_NODE_SO_NAME}:${id}`,
namespaces: ['default'],
references: [],
score: 1,
type: BACKGROUND_TASK_NODE_SO_NAME,
updated_at: new Date().toDateString(),
version: '1',
});
const createFindResponse = (
soList: Array<SavedObjectsFindResult<BackgroundTaskNode>>
): SavedObjectsFindResponse<BackgroundTaskNode, unknown> => ({
total: 1,
per_page: 10000,
page: 1,
saved_objects: soList,
});
describe('KibanaDiscoveryService', () => {
const savedObjectsRepository = savedObjectsRepositoryMock.create();
const logger = loggingSystemMock.createLogger();

View file

@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { savedObjectsRepositoryMock, loggingSystemMock } from '@kbn/core/server/mocks';
import { SavedObjectsFindResponse, SavedObjectsFindResult } from '@kbn/core/server';
import { BackgroundTaskNode } from '../saved_objects/schemas/background_task_node';
import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects';
import { KibanaDiscoveryService } from './kibana_discovery_service';
export const createDiscoveryServiceMock = (currentNode: string) => {
const savedObjectsRepository = savedObjectsRepositoryMock.create();
const logger = loggingSystemMock.createLogger();
const discoveryService = new KibanaDiscoveryService({
savedObjectsRepository,
logger,
currentNode,
});
for (const method of ['getActiveKibanaNodes'] as Array<keyof KibanaDiscoveryService>) {
jest.spyOn(discoveryService, method);
}
return discoveryService as jest.Mocked<KibanaDiscoveryService>;
};
export const createNodeRecord = (id: string, lastSeen: string): BackgroundTaskNode => ({
id,
last_seen: lastSeen,
});
export const createFindSO = (
id: string,
lastSeen: string
): SavedObjectsFindResult<BackgroundTaskNode> => ({
attributes: createNodeRecord(id, lastSeen),
id: `${BACKGROUND_TASK_NODE_SO_NAME}:${id}`,
namespaces: ['default'],
references: [],
score: 1,
type: BACKGROUND_TASK_NODE_SO_NAME,
updated_at: new Date().toDateString(),
version: '1',
});
export const createFindResponse = (
soList: Array<SavedObjectsFindResult<BackgroundTaskNode>>
): SavedObjectsFindResponse<BackgroundTaskNode, unknown> => ({
total: 1,
per_page: 10000,
page: 1,
saved_objects: soList,
});

View file

@ -0,0 +1,144 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { assignPodPartitions, getParitionMap } from './assign_pod_partitions';
describe('assignPodPartitions', () => {
test('two pods', () => {
const allPods = ['foo', 'bar'];
const allPartitions = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const map = getParitionMap(allPods, allPartitions);
expect(map).toMatchInlineSnapshot(`
Object {
"1": Array [
"bar",
"foo",
],
"10": Array [
"bar",
"foo",
],
"2": Array [
"bar",
"foo",
],
"3": Array [
"bar",
"foo",
],
"4": Array [
"bar",
"foo",
],
"5": Array [
"bar",
"foo",
],
"6": Array [
"bar",
"foo",
],
"7": Array [
"bar",
"foo",
],
"8": Array [
"bar",
"foo",
],
"9": Array [
"bar",
"foo",
],
}
`);
});
test('three pods', () => {
const allPods = ['foo', 'bar', 'quz'];
const allPartitions = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const map = getParitionMap(allPods, allPartitions);
expect(map).toMatchInlineSnapshot(`
Object {
"1": Array [
"bar",
"foo",
],
"10": Array [
"bar",
"foo",
],
"2": Array [
"quz",
"bar",
],
"3": Array [
"foo",
"quz",
],
"4": Array [
"bar",
"foo",
],
"5": Array [
"quz",
"bar",
],
"6": Array [
"foo",
"quz",
],
"7": Array [
"bar",
"foo",
],
"8": Array [
"quz",
"bar",
],
"9": Array [
"foo",
"quz",
],
}
`);
const fooPartitions = assignPodPartitions('foo', allPods, allPartitions);
expect(fooPartitions).toMatchInlineSnapshot(`
Array [
1,
3,
4,
6,
7,
9,
10,
]
`);
const barPartitions = assignPodPartitions('bar', allPods, allPartitions);
expect(barPartitions).toMatchInlineSnapshot(`
Array [
1,
2,
4,
5,
7,
8,
10,
]
`);
const quzPartitions = assignPodPartitions('quz', allPods, allPartitions);
expect(quzPartitions).toMatchInlineSnapshot(`
Array [
2,
3,
5,
6,
8,
9,
]
`);
});
});

View file

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
const KIBANAS_PER_PARTITION = 2;
export function getParitionMap(podNames: string[], partitions: number[]): Record<number, string[]> {
const map: Record<number, string[]> = {};
let counter = 0;
for (const parition of partitions) {
map[parition] = [];
for (let i = 0; i < KIBANAS_PER_PARTITION; i++) {
map[parition].push(podNames.sort()[counter++ % podNames.length]);
}
}
return map;
}
export function assignPodPartitions(
podName: string,
podNames: string[],
partitions: number[]
): number[] {
const map = getParitionMap(podNames, partitions);
const podPartitions: number[] = [];
for (const partition of Object.keys(map)) {
if (map[Number(partition)].indexOf(podName) !== -1) {
podPartitions.push(Number(partition));
}
}
return podPartitions;
}

View file

@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
createDiscoveryServiceMock,
createFindSO,
} from '../kibana_discovery_service/mock_kibana_discovery_service';
import { TaskPartitioner } from './task_partitioner';
const POD_NAME = 'test-pod';
describe('getAllPartitions()', () => {
const discoveryServiceMock = createDiscoveryServiceMock(POD_NAME);
test('correctly sets allPartitions in constructor', () => {
const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock);
expect(taskPartitioner.getAllPartitions()).toEqual([
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25,
26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48,
49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71,
72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94,
95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114,
115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133,
134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152,
153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171,
172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190,
191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209,
210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228,
229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247,
248, 249, 250, 251, 252, 253, 254, 255,
]);
});
});
describe('getPodName()', () => {
const discoveryServiceMock = createDiscoveryServiceMock(POD_NAME);
test('correctly sets podName in constructor', () => {
const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock);
expect(taskPartitioner.getPodName()).toEqual('test-pod');
});
});
describe('getPartitions()', () => {
const lastSeen = '2024-08-10T10:00:00.000Z';
const discoveryServiceMock = createDiscoveryServiceMock(POD_NAME);
discoveryServiceMock.getActiveKibanaNodes.mockResolvedValue([
createFindSO(POD_NAME, lastSeen),
createFindSO('test-pod-2', lastSeen),
createFindSO('test-pod-3', lastSeen),
]);
test('correctly gets the partitons for this pod', async () => {
const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock);
expect(await taskPartitioner.getPartitions()).toEqual([
0, 1, 3, 4, 6, 7, 9, 10, 12, 13, 15, 16, 18, 19, 21, 22, 24, 25, 27, 28, 30, 31, 33, 34, 36,
37, 39, 40, 42, 43, 45, 46, 48, 49, 51, 52, 54, 55, 57, 58, 60, 61, 63, 64, 66, 67, 69, 70,
72, 73, 75, 76, 78, 79, 81, 82, 84, 85, 87, 88, 90, 91, 93, 94, 96, 97, 99, 100, 102, 103,
105, 106, 108, 109, 111, 112, 114, 115, 117, 118, 120, 121, 123, 124, 126, 127, 129, 130, 132,
133, 135, 136, 138, 139, 141, 142, 144, 145, 147, 148, 150, 151, 153, 154, 156, 157, 159, 160,
162, 163, 165, 166, 168, 169, 171, 172, 174, 175, 177, 178, 180, 181, 183, 184, 186, 187, 189,
190, 192, 193, 195, 196, 198, 199, 201, 202, 204, 205, 207, 208, 210, 211, 213, 214, 216, 217,
219, 220, 222, 223, 225, 226, 228, 229, 231, 232, 234, 235, 237, 238, 240, 241, 243, 244, 246,
247, 249, 250, 252, 253, 255,
]);
});
});

View file

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { KibanaDiscoveryService } from '../kibana_discovery_service';
import { assignPodPartitions } from './assign_pod_partitions';
function range(start: number, end: number) {
const nums: number[] = [];
for (let i = start; i < end; ++i) {
nums.push(i);
}
return nums;
}
export const MAX_PARTITIONS = 256;
export class TaskPartitioner {
private readonly allPartitions: number[];
private readonly podName: string;
private kibanaDiscoveryService: KibanaDiscoveryService;
constructor(podName: string, kibanaDiscoveryService: KibanaDiscoveryService) {
this.allPartitions = range(0, MAX_PARTITIONS);
this.podName = podName;
this.kibanaDiscoveryService = kibanaDiscoveryService;
}
getAllPartitions(): number[] {
return this.allPartitions;
}
getPodName(): string {
return this.podName;
}
async getPartitions(): Promise<number[]> {
const allPodNames = await this.getAllPodNames();
const podPartitions = assignPodPartitions(this.podName, allPodNames, this.allPartitions);
return podPartitions;
}
private async getAllPodNames(): Promise<string[]> {
const nodes = await this.kibanaDiscoveryService.getActiveKibanaNodes();
return nodes.map((node) => node.attributes.id);
}
}

View file

@ -42,6 +42,7 @@ import { AdHocTaskCounter } from './lib/adhoc_task_counter';
import { setupIntervalLogging } from './lib/log_health_metrics';
import { metricsStream, Metrics } from './metrics';
import { TaskManagerMetricsCollector } from './metrics/task_metrics_collector';
import { TaskPartitioner } from './lib/task_partitioner';
export interface TaskManagerSetupContract {
/**
@ -281,6 +282,8 @@ export class TaskManagerPlugin
taskTypes: new Set(this.definitions.getAllTypes()),
excludedTypes: new Set(this.config.unsafe.exclude_task_types),
});
const taskPartitioner = new TaskPartitioner(this.taskManagerId!, this.kibanaDiscoveryService);
this.taskPollingLifecycle = new TaskPollingLifecycle({
config: this.config!,
definitions: this.definitions,
@ -292,6 +295,7 @@ export class TaskManagerPlugin
middleware: this.middleware,
elasticsearchAndSOAvailability$: this.elasticsearchAndSOAvailability$!,
...managedConfiguration,
taskPartitioner,
});
this.ephemeralTaskLifecycle = new EphemeralTaskLifecycle({

View file

@ -20,6 +20,8 @@ import { asOk, Err, isErr, isOk, Result } from './lib/result_type';
import { FillPoolResult } from './lib/fill_pool';
import { ElasticsearchResponseError } from './lib/identify_es_error';
import { executionContextServiceMock } from '@kbn/core/server/mocks';
import { TaskPartitioner } from './lib/task_partitioner';
import { KibanaDiscoveryService } from './kibana_discovery_service';
const executionContext = executionContextServiceMock.createSetupContract();
let mockTaskClaiming = taskClaimingMock.create({});
@ -91,6 +93,7 @@ describe('TaskPollingLifecycle', () => {
maxWorkersConfiguration$: of(100),
pollIntervalConfiguration$: of(100),
executionContext,
taskPartitioner: new TaskPartitioner('test', {} as KibanaDiscoveryService),
};
beforeEach(() => {

View file

@ -43,6 +43,7 @@ import { TaskTypeDictionary } from './task_type_dictionary';
import { delayOnClaimConflicts } from './polling';
import { TaskClaiming } from './queries/task_claiming';
import { ClaimOwnershipResult } from './task_claimers';
import { TaskPartitioner } from './lib/task_partitioner';
export interface ITaskEventEmitter<T> {
get events(): Observable<T>;
@ -58,6 +59,7 @@ export type TaskPollingLifecycleOpts = {
elasticsearchAndSOAvailability$: Observable<boolean>;
executionContext: ExecutionContextStart;
usageCounter?: UsageCounter;
taskPartitioner: TaskPartitioner;
} & ManagedConfiguration;
export type TaskLifecycleEvent =
@ -109,6 +111,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
unusedTypes,
executionContext,
usageCounter,
taskPartitioner,
}: TaskPollingLifecycleOpts) {
this.logger = logger;
this.middleware = middleware;
@ -150,6 +153,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
0
)
: this.pool.availableWorkers,
taskPartitioner,
});
// pipe taskClaiming events into the lifecycle event stream
this.taskClaiming.events.subscribe(emitEvent);

View file

@ -17,6 +17,7 @@ import {
InactiveTasks,
RecognizedTask,
OneOfTaskTypes,
tasksWithPartitions,
} from './mark_available_tasks_as_claimed';
import { TaskTypeDictionary } from '../task_type_dictionary';
@ -266,4 +267,41 @@ if (doc['task.runAt'].size()!=0) {
}
`);
});
test('generates tasksWithPartitions clause as expected', () => {
expect(tasksWithPartitions([1, 2, 3])).toMatchInlineSnapshot(`
Object {
"bool": Object {
"filter": Array [
Object {
"bool": Object {
"should": Array [
Object {
"terms": Object {
"task.partition": Array [
1,
2,
3,
],
},
},
Object {
"bool": Object {
"must_not": Array [
Object {
"exists": Object {
"field": "task.partition",
},
},
],
},
},
],
},
},
],
},
}
`);
});
});

View file

@ -233,3 +233,34 @@ export const OneOfTaskTypes = (field: string, types: string[]): MustCondition =>
},
};
};
export function tasksWithPartitions(partitions: number[]): estypes.QueryDslQueryContainer {
return {
bool: {
filter: [
{
bool: {
should: [
{
terms: {
'task.partition': partitions,
},
},
{
bool: {
must_not: [
{
exists: {
field: 'task.partition',
},
},
],
},
},
],
},
},
],
},
};
}

View file

@ -10,6 +10,8 @@ import { mockLogger } from '../test_utils';
import { TaskClaiming } from './task_claiming';
import { taskStoreMock } from '../task_store.mock';
import apm from 'elastic-apm-node';
import { TaskPartitioner } from '../lib/task_partitioner';
import { KibanaDiscoveryService } from '../kibana_discovery_service';
jest.mock('../constants', () => ({
CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: [
@ -23,6 +25,7 @@ jest.mock('../constants', () => ({
}));
const taskManagerLogger = mockLogger();
const taskPartitioner = new TaskPartitioner('test', {} as KibanaDiscoveryService);
beforeEach(() => jest.clearAllMocks());
@ -78,6 +81,7 @@ describe('TaskClaiming', () => {
taskStore: taskStoreMock.create({ taskManagerId: '' }),
maxAttempts: 2,
getCapacity: () => 10,
taskPartitioner,
});
expect(taskManagerLogger.warn).toHaveBeenCalledWith(
@ -127,6 +131,7 @@ describe('TaskClaiming', () => {
taskStore: taskStoreMock.create({ taskManagerId: '' }),
maxAttempts: 2,
getCapacity: () => 10,
taskPartitioner,
});
expect(taskManagerLogger.info).toHaveBeenCalledTimes(2);

View file

@ -27,6 +27,7 @@ import {
ClaimOwnershipResult,
getTaskClaimer,
} from '../task_claimers';
import { TaskPartitioner } from '../lib/task_partitioner';
export type { ClaimOwnershipResult } from '../task_claimers';
export interface TaskClaimingOpts {
@ -38,6 +39,7 @@ export interface TaskClaimingOpts {
maxAttempts: number;
excludedTaskTypes: string[];
getCapacity: (taskType?: string) => number;
taskPartitioner: TaskPartitioner;
}
export interface OwnershipClaimingOpts {
@ -92,6 +94,7 @@ export class TaskClaiming {
private readonly excludedTaskTypes: string[];
private readonly unusedTypes: string[];
private readonly taskClaimer: TaskClaimerFn;
private readonly taskPartitioner: TaskPartitioner;
/**
* Constructs a new TaskStore.
@ -111,6 +114,7 @@ export class TaskClaiming {
this.unusedTypes = opts.unusedTypes;
this.taskClaimer = getTaskClaimer(this.logger, opts.strategy);
this.events$ = new Subject<TaskClaim>();
this.taskPartitioner = opts.taskPartitioner;
this.logger.info(`using task claiming strategy: ${opts.strategy}`);
}
@ -178,6 +182,7 @@ export class TaskClaiming {
taskMaxAttempts: this.taskMaxAttempts,
excludedTaskTypes: this.excludedTaskTypes,
logger: this.logger,
taskPartitioner: this.taskPartitioner,
};
return this.taskClaimer(opts).pipe(map((claimResult) => asOk(claimResult)));
}

View file

@ -62,6 +62,9 @@ export const taskMappings: SavedObjectsTypeMappingDefinition = {
ownerId: {
type: 'keyword',
},
partition: {
type: 'integer',
},
},
};

View file

@ -6,7 +6,7 @@
*/
import { SavedObjectsModelVersionMap } from '@kbn/core-saved-objects-server';
import { taskSchemaV1 } from '../schemas/task';
import { taskSchemaV1, taskSchemaV2 } from '../schemas/task';
export const taskModelVersions: SavedObjectsModelVersionMap = {
'1': {
@ -17,7 +17,22 @@ export const taskModelVersions: SavedObjectsModelVersionMap = {
},
],
schemas: {
forwardCompatibility: taskSchemaV1.extends({}, { unknowns: 'ignore' }),
create: taskSchemaV1,
},
},
'2': {
changes: [
{
type: 'mappings_addition',
addedMappings: {
partition: { type: 'integer' },
},
},
],
schemas: {
forwardCompatibility: taskSchemaV2.extends({}, { unknowns: 'ignore' }),
create: taskSchemaV2,
},
},
};

View file

@ -38,3 +38,7 @@ export const taskSchemaV1 = schema.object({
]),
version: schema.maybe(schema.string()),
});
export const taskSchemaV2 = taskSchemaV1.extends({
partition: schema.maybe(schema.number()),
});

View file

@ -328,6 +328,11 @@ export interface TaskInstance {
* Optionally override the timeout defined in the task type for this specific task instance
*/
timeoutOverride?: string;
/*
* Used to break up tasks so each Kibana node can claim tasks on a subset of the partitions
*/
partition?: number;
}
/**
@ -426,6 +431,11 @@ export interface ConcreteTaskInstance extends TaskInstance {
* The random uuid of the Kibana instance which claimed ownership of the task last
*/
ownerId: string | null;
/*
* Used to break up tasks so each Kibana node can claim tasks on a subset of the partitions
*/
partition?: number;
}
export interface ConcreteTaskInstanceVersion {
@ -460,4 +470,5 @@ export type SerializedConcreteTaskInstance = Omit<
startedAt: string | null;
retryAt: string | null;
runAt: string;
partition?: number;
};

View file

@ -16,6 +16,7 @@ import { ConcreteTaskInstance } from '../task';
import { claimAvailableTasksDefault } from './strategy_default';
import { claimAvailableTasksMget } from './strategy_mget';
import { CLAIM_STRATEGY_DEFAULT, CLAIM_STRATEGY_MGET } from '../config';
import { TaskPartitioner } from '../lib/task_partitioner';
export interface TaskClaimerOpts {
getCapacity: (taskType?: string | undefined) => number;
@ -28,6 +29,7 @@ export interface TaskClaimerOpts {
excludedTaskTypes: string[];
taskMaxAttempts: Record<string, number>;
logger: Logger;
taskPartitioner: TaskPartitioner;
}
export interface ClaimOwnershipResult {

View file

@ -28,6 +28,8 @@ import apm from 'elastic-apm-node';
import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running';
import { ClaimOwnershipResult } from '.';
import { FillPoolResult } from '../lib/fill_pool';
import { TaskPartitioner } from '../lib/task_partitioner';
import { KibanaDiscoveryService } from '../kibana_discovery_service';
jest.mock('../constants', () => ({
CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: [
@ -41,6 +43,7 @@ jest.mock('../constants', () => ({
}));
const taskManagerLogger = mockLogger();
const taskPartitioner = new TaskPartitioner('test', {} as KibanaDiscoveryService);
beforeEach(() => jest.clearAllMocks());
@ -131,6 +134,7 @@ describe('TaskClaiming', () => {
unusedTypes: unusedTaskTypes,
maxAttempts: taskClaimingOpts.maxAttempts ?? 2,
getCapacity: taskClaimingOpts.getCapacity ?? (() => 10),
taskPartitioner,
...taskClaimingOpts,
});
@ -1251,6 +1255,7 @@ if (doc['task.runAt'].size()!=0) {
taskStore,
maxAttempts: 2,
getCapacity,
taskPartitioner,
});
return { taskManagerId, runAt, taskClaiming };

View file

@ -16,7 +16,7 @@ import {
ConcreteTaskInstanceVersion,
TaskPriority,
} from '../task';
import { StoreOpts } from '../task_store';
import { SearchOpts, StoreOpts } from '../task_store';
import { asTaskClaimEvent, TaskEvent } from '../task_events';
import { asOk, isOk, unwrap } from '../lib/result_type';
import { TaskTypeDictionary } from '../task_type_dictionary';
@ -33,6 +33,16 @@ import apm from 'elastic-apm-node';
import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running';
import { ClaimOwnershipResult } from '.';
import { FillPoolResult } from '../lib/fill_pool';
import { TaskPartitioner } from '../lib/task_partitioner';
import type { MustNotCondition } from '../queries/query_clauses';
import {
createDiscoveryServiceMock,
createFindSO,
} from '../kibana_discovery_service/mock_kibana_discovery_service';
jest.mock('../lib/assign_pod_partitions', () => ({
assignPodPartitions: jest.fn().mockReturnValue([1, 3]),
}));
jest.mock('../constants', () => ({
CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: [
@ -80,6 +90,15 @@ const mockApmTrans = {
end: jest.fn(),
};
const discoveryServiceMock = createDiscoveryServiceMock('test');
const lastSeen = '2024-08-10T10:00:00.000Z';
discoveryServiceMock.getActiveKibanaNodes.mockResolvedValue([
createFindSO('test', lastSeen),
createFindSO('test-pod-2', lastSeen),
createFindSO('test-pod-3', lastSeen),
]);
const taskPartitioner = new TaskPartitioner('test', discoveryServiceMock);
// needs more tests in the similar to the `strategy_default.test.ts` test suite
describe('TaskClaiming', () => {
beforeEach(() => {
@ -138,6 +157,7 @@ describe('TaskClaiming', () => {
unusedTypes: unusedTaskTypes,
maxAttempts: taskClaimingOpts.maxAttempts ?? 2,
getCapacity: taskClaimingOpts.getCapacity ?? (() => 10),
taskPartitioner,
...taskClaimingOpts,
});
@ -183,17 +203,13 @@ describe('TaskClaiming', () => {
return unwrap(resultOrErr) as ClaimOwnershipResult;
});
expect(apm.startTransaction).toHaveBeenCalledWith(
TASK_MANAGER_MARK_AS_CLAIMED,
TASK_MANAGER_TRANSACTION_TYPE
);
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
expect(store.fetch.mock.calls).toMatchObject({});
expect(store.getDocVersions.mock.calls).toMatchObject({});
return results.map((result, index) => ({
result,
args: {},
args: {
search: store.fetch.mock.calls[index][0] as SearchOpts & {
query: MustNotCondition;
},
},
}));
}
@ -272,6 +288,151 @@ describe('TaskClaiming', () => {
});
expect(result).toMatchObject({});
});
test('it should filter for specific partitions and tasks without partitions', async () => {
const taskManagerId = uuidv4();
const [
{
args: {
search: { query },
},
},
] = await testClaimAvailableTasks({
storeOpts: {
taskManagerId,
},
taskClaimingOpts: {},
claimingOpts: {
claimOwnershipUntil: new Date(),
},
});
expect(query).toMatchInlineSnapshot(`
Object {
"bool": Object {
"filter": Array [
Object {
"bool": Object {
"should": Array [
Object {
"terms": Object {
"task.partition": Array [
1,
3,
],
},
},
Object {
"bool": Object {
"must_not": Array [
Object {
"exists": Object {
"field": "task.partition",
},
},
],
},
},
],
},
},
],
"must": Array [
Object {
"bool": Object {
"must": Array [
Object {
"term": Object {
"task.enabled": true,
},
},
],
},
},
Object {
"bool": Object {
"must": Array [
Object {
"terms": Object {
"task.taskType": Array [
"report",
"dernstraight",
"yawn",
],
},
},
],
},
},
Object {
"bool": Object {
"should": Array [
Object {
"bool": Object {
"must": Array [
Object {
"term": Object {
"task.status": "idle",
},
},
Object {
"range": Object {
"task.runAt": Object {
"lte": "now",
},
},
},
],
},
},
Object {
"bool": Object {
"must": Array [
Object {
"bool": Object {
"should": Array [
Object {
"term": Object {
"task.status": "running",
},
},
Object {
"term": Object {
"task.status": "claiming",
},
},
],
},
},
Object {
"range": Object {
"task.retryAt": Object {
"lte": "now",
},
},
},
],
},
},
],
},
},
Object {
"bool": Object {
"must_not": Array [
Object {
"term": Object {
"task.status": "unrecognized",
},
},
],
},
},
],
},
}
`);
});
});
describe('task events', () => {
@ -373,6 +534,7 @@ describe('TaskClaiming', () => {
taskStore,
maxAttempts: 2,
getCapacity,
taskPartitioner,
});
return { taskManagerId, runAt, taskClaiming };

View file

@ -36,10 +36,12 @@ import {
EnabledTask,
OneOfTaskTypes,
RecognizedTask,
tasksWithPartitions,
} from '../queries/mark_available_tasks_as_claimed';
import { TaskStore, SearchOpts } from '../task_store';
import { isOk, asOk } from '../lib/result_type';
import { TaskPartitioner } from '../lib/task_partitioner';
interface OwnershipClaimingOpts {
claimOwnershipUntil: Date;
@ -51,6 +53,7 @@ interface OwnershipClaimingOpts {
events$: Subject<TaskClaim>;
definitions: TaskTypeDictionary;
taskMaxAttempts: Record<string, number>;
taskPartitioner: TaskPartitioner;
}
const SIZE_MULTIPLIER_FOR_TASK_FETCH = 4;
@ -89,7 +92,7 @@ async function claimAvailableTasksApm(opts: TaskClaimerOpts): Promise<ClaimOwner
}
async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershipResult> {
const { getCapacity, claimOwnershipUntil, batches, events$, taskStore } = opts;
const { getCapacity, claimOwnershipUntil, batches, events$, taskStore, taskPartitioner } = opts;
const { definitions, unusedTypes, excludedTaskTypes, taskMaxAttempts } = opts;
const { logger } = opts;
const loggerTag = claimAvailableTasksMget.name;
@ -111,6 +114,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
claimOwnershipUntil,
size: initialCapacity * SIZE_MULTIPLIER_FOR_TASK_FETCH,
taskMaxAttempts,
taskPartitioner,
});
if (docs.length === 0)
@ -267,7 +271,7 @@ async function searchAvailableTasks({
excludedTypes,
taskStore,
size,
taskMaxAttempts,
taskPartitioner,
}: OwnershipClaimingOpts): Promise<SearchAvailableTasksResponse> {
const searchedTypes = Array.from(taskTypes)
.concat(Array.from(removedTypes))
@ -283,9 +287,14 @@ async function searchAvailableTasks({
// must have a status that isn't 'unrecognized'
RecognizedTask
);
const partitions = await taskPartitioner.getPartitions();
const sort: NonNullable<SearchOpts['sort']> = getClaimSort(definitions);
const query = matchesClauses(queryForScheduledTasks, filterDownBy(InactiveTasks));
const query = matchesClauses(
queryForScheduledTasks,
filterDownBy(InactiveTasks),
tasksWithPartitions(partitions)
);
return await taskStore.fetch({
query,

View file

@ -160,6 +160,7 @@ describe('TaskStore', () => {
taskType: 'report',
user: undefined,
traceparent: 'apmTraceparent',
partition: 225,
},
{
id: 'id',
@ -183,6 +184,7 @@ describe('TaskStore', () => {
user: undefined,
version: '123',
traceparent: 'apmTraceparent',
partition: 225,
});
});
@ -490,6 +492,7 @@ describe('TaskStore', () => {
version: '123',
ownerId: null,
traceparent: 'myTraceparent',
partition: 99,
};
savedObjectsClient.update.mockImplementation(
@ -532,6 +535,7 @@ describe('TaskStore', () => {
user: undefined,
ownerId: null,
traceparent: 'myTraceparent',
partition: 99,
},
{ version: '123', refresh: false }
);
@ -1050,6 +1054,7 @@ describe('TaskStore', () => {
status: 'idle',
taskType: 'report',
traceparent: 'apmTraceparent',
partition: 225,
},
references: [],
version: '123',
@ -1089,6 +1094,7 @@ describe('TaskStore', () => {
status: 'idle',
taskType: 'report',
traceparent: 'apmTraceparent',
partition: 225,
},
},
],
@ -1113,6 +1119,7 @@ describe('TaskStore', () => {
user: undefined,
version: '123',
traceparent: 'apmTraceparent',
partition: 225,
},
]);
});

View file

@ -8,6 +8,8 @@
/*
* This module contains helpers for managing the task manager storage layer.
*/
import murmurhash from 'murmurhash';
import { v4 } from 'uuid';
import { Subject } from 'rxjs';
import { omit, defaults, get } from 'lodash';
import { SavedObjectError } from '@kbn/core-saved-objects-common';
@ -39,6 +41,7 @@ import {
import { TaskTypeDictionary } from './task_type_dictionary';
import { AdHocTaskCounter } from './lib/adhoc_task_counter';
import { TaskValidator } from './task_validator';
import { MAX_PARTITIONS } from './lib/task_partitioner';
export interface StoreOpts {
esClient: ElasticsearchClient;
@ -165,12 +168,13 @@ export class TaskStore {
let savedObject;
try {
const id = taskInstance.id || v4();
const validatedTaskInstance =
this.taskValidator.getValidatedTaskInstanceForUpdating(taskInstance);
savedObject = await this.savedObjectsRepository.create<SerializedConcreteTaskInstance>(
'task',
taskInstanceToAttributes(validatedTaskInstance),
{ id: taskInstance.id, refresh: false }
taskInstanceToAttributes(validatedTaskInstance, id),
{ id, refresh: false }
);
if (get(taskInstance, 'schedule.interval', null) == null) {
this.adHocTaskCounter.increment();
@ -191,13 +195,14 @@ export class TaskStore {
*/
public async bulkSchedule(taskInstances: TaskInstance[]): Promise<ConcreteTaskInstance[]> {
const objects = taskInstances.map((taskInstance) => {
const id = taskInstance.id || v4();
this.definitions.ensureHas(taskInstance.taskType);
const validatedTaskInstance =
this.taskValidator.getValidatedTaskInstanceForUpdating(taskInstance);
return {
type: 'task',
attributes: taskInstanceToAttributes(validatedTaskInstance),
id: taskInstance.id,
attributes: taskInstanceToAttributes(validatedTaskInstance, id),
id,
};
});
@ -252,7 +257,7 @@ export class TaskStore {
const taskInstance = this.taskValidator.getValidatedTaskInstanceForUpdating(doc, {
validate: options.validate,
});
const attributes = taskInstanceToAttributes(taskInstance);
const attributes = taskInstanceToAttributes(taskInstance, doc.id);
let updatedSavedObject;
try {
@ -297,7 +302,7 @@ export class TaskStore {
const taskInstance = this.taskValidator.getValidatedTaskInstanceForUpdating(doc, {
validate: options.validate,
});
attrsById.set(doc.id, taskInstanceToAttributes(taskInstance));
attrsById.set(doc.id, taskInstanceToAttributes(taskInstance, doc.id));
return attrsById;
}, new Map());
@ -622,7 +627,7 @@ export function correctVersionConflictsForContinuation(
return maxDocs && versionConflicts + updated > maxDocs ? maxDocs - updated : versionConflicts;
}
function taskInstanceToAttributes(doc: TaskInstance): SerializedConcreteTaskInstance {
function taskInstanceToAttributes(doc: TaskInstance, id: string): SerializedConcreteTaskInstance {
return {
...omit(doc, 'id', 'version'),
params: JSON.stringify(doc.params || {}),
@ -633,6 +638,7 @@ function taskInstanceToAttributes(doc: TaskInstance): SerializedConcreteTaskInst
retryAt: (doc.retryAt && doc.retryAt.toISOString()) || null,
runAt: (doc.runAt || new Date()).toISOString(),
status: (doc as ConcreteTaskInstance).status || 'idle',
partition: doc.partition || murmurhash.v3(id) % MAX_PARTITIONS,
} as SerializedConcreteTaskInstance;
}

View file

@ -17,6 +17,7 @@ import {
} from '@kbn/core/server';
import { EventEmitter } from 'events';
import { TaskManagerStartContract } from '@kbn/task-manager-plugin/server';
import { BACKGROUND_TASK_NODE_SO_NAME } from '@kbn/task-manager-plugin/server/saved_objects';
const scope = 'testing';
const taskManagerQuery = {
@ -401,4 +402,40 @@ export function initRoutes(
}
}
);
router.post(
{
path: `/api/update_kibana_node`,
validate: {
body: schema.object({
id: schema.string(),
lastSeen: schema.string(),
}),
},
},
async function (
context: RequestHandlerContext,
req: KibanaRequest<any, any, any, any>,
res: KibanaResponseFactory
): Promise<IKibanaResponse<any>> {
const { id, lastSeen } = req.body;
const client = (await context.core).savedObjects.getClient({
includedHiddenTypes: [BACKGROUND_TASK_NODE_SO_NAME],
});
const node = await client.update(
BACKGROUND_TASK_NODE_SO_NAME,
id,
{
id,
last_seen: lastSeen,
},
{ upsert: { id, last_seen: lastSeen }, refresh: false, retryOnConflict: 3 }
);
return res.ok({
body: node,
});
}
);
}

View file

@ -16,6 +16,7 @@ export default function ({ loadTestFile }: FtrProviderContext) {
loadTestFile(require.resolve('./task_management'));
loadTestFile(require.resolve('./task_management_scheduled_at'));
loadTestFile(require.resolve('./task_management_removed_types'));
loadTestFile(require.resolve('./task_partitions'));
loadTestFile(require.resolve('./migrations'));
});

View file

@ -0,0 +1,218 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server';
import { taskMappings as TaskManagerMapping } from '@kbn/task-manager-plugin/server/saved_objects/mappings';
import { asyncForEach } from '@kbn/std';
import { FtrProviderContext } from '../../ftr_provider_context';
const { properties: taskManagerIndexMapping } = TaskManagerMapping;
export interface RawDoc {
_id: string;
_source: any;
_type?: string;
}
export interface SearchResults {
hits: {
hits: RawDoc[];
};
}
type DeprecatedConcreteTaskInstance = Omit<ConcreteTaskInstance, 'schedule'> & {
interval: string;
};
type SerializedConcreteTaskInstance<State = string, Params = string> = Omit<
ConcreteTaskInstance,
'state' | 'params' | 'scheduledAt' | 'startedAt' | 'retryAt' | 'runAt'
> & {
state: State;
params: Params;
scheduledAt: string;
startedAt: string | null;
retryAt: string | null;
runAt: string;
};
export default function ({ getService }: FtrProviderContext) {
const es = getService('es');
const retry = getService('retry');
const supertest = getService('supertest');
const testHistoryIndex = '.kibana_task_manager_test_result';
const testNode1 = 'y-test-node';
const testNode2 = 'z-test-node';
function scheduleTask(
task: Partial<ConcreteTaskInstance | DeprecatedConcreteTaskInstance>
): Promise<SerializedConcreteTaskInstance> {
return supertest
.post('/api/sample_tasks/schedule')
.set('kbn-xsrf', 'xxx')
.send({ task })
.expect(200)
.then((response: { body: SerializedConcreteTaskInstance }) => response.body);
}
function currentTasks<State = unknown, Params = unknown>(): Promise<{
docs: Array<SerializedConcreteTaskInstance<State, Params>>;
}> {
return supertest
.get('/api/sample_tasks')
.expect(200)
.then((response) => response.body);
}
function updateKibanaNodes() {
const lastSeen = new Date().toISOString();
return Promise.all([
supertest
.post('/api/update_kibana_node')
.set('kbn-xsrf', 'xxx')
.send({ id: testNode1, lastSeen })
.expect(200),
supertest
.post('/api/update_kibana_node')
.set('kbn-xsrf', 'xxx')
.send({ id: testNode2, lastSeen })
.expect(200),
]);
}
async function historyDocs({
taskId,
taskType,
}: {
taskId?: string;
taskType?: string;
}): Promise<RawDoc[]> {
const filter: any[] = [{ term: { type: 'task' } }];
if (taskId) {
filter.push({ term: { taskId } });
}
if (taskType) {
filter.push({ term: { taskType } });
}
return es
.search({
index: testHistoryIndex,
body: {
query: {
bool: {
filter,
},
},
},
})
.then((result) => (result as unknown as SearchResults).hits.hits);
}
describe('task partitions', () => {
beforeEach(async () => {
const exists = await es.indices.exists({ index: testHistoryIndex });
if (exists) {
await es.deleteByQuery({
index: testHistoryIndex,
refresh: true,
body: { query: { term: { type: 'task' } } },
});
} else {
await es.indices.create({
index: testHistoryIndex,
body: {
mappings: {
properties: {
type: {
type: 'keyword',
},
taskId: {
type: 'keyword',
},
params: taskManagerIndexMapping.params,
state: taskManagerIndexMapping.state,
runAt: taskManagerIndexMapping.runAt,
} as Record<string, estypes.MappingProperty>,
},
},
});
}
});
afterEach(async () => {
await supertest.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200);
await es.deleteByQuery({
index: '.kibana_task_manager',
refresh: true,
body: { query: { terms: { id: [testNode1, testNode2] } } },
});
});
it('should tasks with partitions assigned to this kibana node', async () => {
const partitions: Record<string, number> = {
'0': 127,
'1': 147,
'2': 23,
};
const tasksToSchedule = [];
for (let i = 0; i < 3; i++) {
tasksToSchedule.push(
scheduleTask({
id: `${i}`,
taskType: 'sampleTask',
schedule: { interval: `1d` },
params: {},
})
);
}
const scheduledTasks = await Promise.all(tasksToSchedule);
let tasks: any[] = [];
await retry.try(async () => {
tasks = (await currentTasks()).docs;
expect(tasks.length).to.eql(3);
});
const taskIds = tasks.map((task) => task.id);
await asyncForEach(scheduledTasks, async (scheduledTask) => {
expect(taskIds).to.contain(scheduledTask.id);
expect(scheduledTask.partition).to.eql(partitions[scheduledTask.id]);
let taskRanOnThisNode: boolean = false;
let counter = 0;
await retry.try(async () => {
await updateKibanaNodes();
const doc: RawDoc[] = await historyDocs({ taskId: scheduledTask.id });
if (doc.length === 1) {
taskRanOnThisNode = true;
return;
}
// we don't want the test to time out, so we check
// 20 times and then return
if (scheduledTask.id === '2' && counter > 20) {
return;
}
counter++;
throw new Error(`The task ID: ${scheduledTask.id} has not run yet`);
});
// taskId 2 should not run on this kibana node
if (scheduledTask.id === '2') {
expect(taskRanOnThisNode).to.be(false);
} else {
expect(taskRanOnThisNode).to.be(true);
}
});
});
});
}

View file

@ -23577,6 +23577,11 @@ murmurhash-js@^1.0.0:
resolved "https://registry.yarnpkg.com/murmurhash-js/-/murmurhash-js-1.0.0.tgz#b06278e21fc6c37fa5313732b0412bcb6ae15f51"
integrity sha1-sGJ44h/Gw3+lMTcysEEry2rhX1E=
murmurhash@^2.0.1:
version "2.0.1"
resolved "https://registry.yarnpkg.com/murmurhash/-/murmurhash-2.0.1.tgz#4097720e08cf978872194ad84ea5be2dec9b610f"
integrity sha512-5vQEh3y+DG/lMPM0mCGPDnyV8chYg/g7rl6v3Gd8WMF9S429ox3Xk8qrk174kWhG767KQMqqxLD1WnGd77hiew==
mustache@^2.3.2:
version "2.3.2"
resolved "https://registry.yarnpkg.com/mustache/-/mustache-2.3.2.tgz#a6d4d9c3f91d13359ab889a812954f9230a3d0c5"