mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 01:13:23 -04:00
Cleanup maxConcurrency usage and create an allow list for what task types can use it (#151930)
Similar to https://github.com/elastic/kibana/pull/144910 In this PR, I'm removing the `maxConcurrency` from the `apm-source-map-migration-task` task types given it only has a single task created for it. The concurrency setting limits how many tasks of such type a single Kibana process should handle at most, and internally requires a separate task claiming query to run every poll interval to claim those tasks. With this PR, task manager goes from running 3 update_by_query requests to 2 every 3 seconds, removing stress put onto Elasticsearch. For more details, see `maxConcurrency` here https://github.com/elastic/kibana/tree/main/x-pack/plugins/task_manager#task-definitions. I've also created an allow list of which task types can set a `maxConcurrency` given the consequences this has on system performance. --------- Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
b1ba7e1bb6
commit
97e85ac2c4
8 changed files with 66 additions and 3 deletions
|
@ -44,7 +44,6 @@ export async function scheduleSourceMapMigration({
|
|||
description: `Migrates fleet source map artifacts to "${APM_SOURCE_MAP_INDEX}" index`,
|
||||
timeout: '1h',
|
||||
maxAttempts: 5,
|
||||
maxConcurrency: 1,
|
||||
createTaskRunner() {
|
||||
const taskState: TaskState = { isAborted: false };
|
||||
|
||||
|
|
|
@ -5,3 +5,13 @@
|
|||
* 2.0.
|
||||
*/
|
||||
export const TASK_MANAGER_INDEX = '.kibana_task_manager';
|
||||
export const CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: string[] = [
|
||||
// for testing
|
||||
'sampleTaskWithSingleConcurrency',
|
||||
'sampleTaskWithLimitedConcurrency',
|
||||
'timedTaskWithSingleConcurrency',
|
||||
'timedTaskWithLimitedConcurrency',
|
||||
|
||||
// task types requiring a concurrency
|
||||
'report:execute',
|
||||
];
|
||||
|
|
|
@ -22,6 +22,10 @@ import { TaskPoolMock } from './task_pool.mock';
|
|||
import { executionContextServiceMock } from '@kbn/core/server/mocks';
|
||||
import { taskManagerMock } from './mocks';
|
||||
|
||||
jest.mock('./constants', () => ({
|
||||
CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: ['report'],
|
||||
}));
|
||||
|
||||
const executionContext = executionContextServiceMock.createSetupContract();
|
||||
|
||||
describe('EphemeralTaskLifecycle', () => {
|
||||
|
|
|
@ -31,6 +31,10 @@ jest.mock('./queries/task_claiming', () => {
|
|||
};
|
||||
});
|
||||
|
||||
jest.mock('./constants', () => ({
|
||||
CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: ['report', 'quickReport'],
|
||||
}));
|
||||
|
||||
describe('TaskPollingLifecycle', () => {
|
||||
let clock: sinon.SinonFakeTimers;
|
||||
const taskManagerLogger = mockLogger();
|
||||
|
|
|
@ -27,6 +27,17 @@ import { taskStoreMock } from '../task_store.mock';
|
|||
import apm from 'elastic-apm-node';
|
||||
import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running';
|
||||
|
||||
jest.mock('../constants', () => ({
|
||||
CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: [
|
||||
'limitedToZero',
|
||||
'limitedToOne',
|
||||
'anotherLimitedToZero',
|
||||
'anotherLimitedToOne',
|
||||
'limitedToTwo',
|
||||
'limitedToFive',
|
||||
],
|
||||
}));
|
||||
|
||||
const taskManagerLogger = mockLogger();
|
||||
|
||||
beforeEach(() => jest.clearAllMocks());
|
||||
|
|
|
@ -27,6 +27,10 @@ jest.mock('uuid', () => ({
|
|||
v4: () => 'v4uuid',
|
||||
}));
|
||||
|
||||
jest.mock('./constants', () => ({
|
||||
CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: ['foo'],
|
||||
}));
|
||||
|
||||
jest.mock('elastic-apm-node', () => ({
|
||||
currentTraceparent: 'parent',
|
||||
currentTransaction: {
|
||||
|
|
|
@ -14,6 +14,10 @@ import {
|
|||
TaskTypeDictionary,
|
||||
} from './task_type_dictionary';
|
||||
|
||||
jest.mock('./constants', () => ({
|
||||
CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: ['foo'],
|
||||
}));
|
||||
|
||||
interface Opts {
|
||||
numTasks: number;
|
||||
}
|
||||
|
@ -182,7 +186,6 @@ describe('taskTypeDictionary', () => {
|
|||
definitions.registerTaskDefinitions({
|
||||
foo: {
|
||||
title: 'foo',
|
||||
maxConcurrency: 2,
|
||||
createTaskRunner: jest.fn(),
|
||||
},
|
||||
});
|
||||
|
@ -209,5 +212,19 @@ describe('taskTypeDictionary', () => {
|
|||
`"Task sampleTaskRemovedType has been removed from registration!"`
|
||||
);
|
||||
});
|
||||
|
||||
it(`throws error when setting maxConcurrency to a task type that isn't allowed to set it`, () => {
|
||||
expect(() => {
|
||||
definitions.registerTaskDefinitions({
|
||||
foo2: {
|
||||
title: 'foo2',
|
||||
maxConcurrency: 2,
|
||||
createTaskRunner: jest.fn(),
|
||||
},
|
||||
});
|
||||
}).toThrowErrorMatchingInlineSnapshot(
|
||||
`"maxConcurrency setting isn't allowed for task type: foo2"`
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
import { Logger } from '@kbn/core/server';
|
||||
import { TaskDefinition, taskDefinitionSchema, TaskRunCreatorFunction } from './task';
|
||||
import { CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE } from './constants';
|
||||
|
||||
/**
|
||||
* Types that are no longer registered and will be marked as unregistered
|
||||
|
@ -129,12 +130,25 @@ export class TaskTypeDictionary {
|
|||
throw new Error(`Task ${removed} has been removed from registration!`);
|
||||
}
|
||||
|
||||
for (const taskType of Object.keys(taskDefinitions)) {
|
||||
if (
|
||||
taskDefinitions[taskType].maxConcurrency !== undefined &&
|
||||
!CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE.includes(taskType)
|
||||
) {
|
||||
// maxConcurrency is designed to limit how many tasks of the same type a single Kibana
|
||||
// instance should run at a time. Meaning if you have 8 Kibanas running, you will still
|
||||
// see up to 8 tasks running at a time but one per Kibana instance. This is helpful for
|
||||
// reporting purposes but not for many other cases and are better off not setting this value.
|
||||
throw new Error(`maxConcurrency setting isn't allowed for task type: ${taskType}`);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
for (const definition of sanitizeTaskDefinitions(taskDefinitions)) {
|
||||
this.definitions.set(definition.type, definition);
|
||||
}
|
||||
} catch (e) {
|
||||
this.logger.error('Could not sanitize task definitions');
|
||||
this.logger.error(`Could not sanitize task definitions: ${e.message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue