[Response Ops][Task Manager] Dynamically set capacity for cloud deployments (#189117)

Followup to https://github.com/elastic/kibana/pull/187999

## Summary

Dynamically set capacity for cloud deployments if claim strategy is
`mget`

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2024-07-26 09:32:07 -04:00 committed by GitHub
parent 6b4a58a163
commit b693df92ff
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 310 additions and 27 deletions

View file

@ -11,6 +11,8 @@
"task_manager"
],
"optionalPlugins": [
"cloud",
"serverless",
"usageCollection"
]
}

View file

@ -101,7 +101,7 @@ describe('managed configuration', () => {
esStart.client.asInternalUser as unknown as Client
);
coreStart.savedObjects.createInternalRepository.mockReturnValue(savedObjectsClient);
taskManagerStart = await taskManager.start(coreStart);
taskManagerStart = await taskManager.start(coreStart, {});
// force rxjs timers to fire when they are scheduled for setTimeout(0) as the
// sinon fake timers cause them to stall
@ -218,7 +218,7 @@ describe('managed configuration', () => {
esStart.client.asInternalUser as unknown as Client
);
coreStart.savedObjects.createInternalRepository.mockReturnValue(savedObjectsClient);
taskManagerStart = await taskManager.start(coreStart);
taskManagerStart = await taskManager.start(coreStart, {});
// force rxjs timers to fire when they are scheduled for setTimeout(0) as the
// sinon fake timers cause them to stall
@ -338,7 +338,7 @@ describe('managed configuration', () => {
esStart.client.asInternalUser as unknown as Client
);
coreStart.savedObjects.createInternalRepository.mockReturnValue(savedObjectsClient);
taskManagerStart = await taskManager.start(coreStart);
taskManagerStart = await taskManager.start(coreStart, {});
// force rxjs timers to fire when they are scheduled for setTimeout(0) as the
// sinon fake timers cause them to stall

View file

@ -13,12 +13,7 @@ import {
ADJUST_THROUGHPUT_INTERVAL,
} from './create_managed_configuration';
import { mockLogger } from '../test_utils';
import {
CLAIM_STRATEGY_DEFAULT,
CLAIM_STRATEGY_MGET,
DEFAULT_CAPACITY,
TaskManagerConfig,
} from '../config';
import { CLAIM_STRATEGY_DEFAULT, CLAIM_STRATEGY_MGET, TaskManagerConfig } from '../config';
describe('createManagedConfiguration()', () => {
let clock: sinon.SinonFakeTimers;
@ -88,10 +83,11 @@ describe('createManagedConfiguration()', () => {
expect(pollIntervalSubscription).toHaveBeenNthCalledWith(1, 2);
});
test('uses DEFAULT_CAPACITY if neither capacity nor max_workers is defined', async () => {
test('uses provided defaultCapacity if neither capacity nor max_workers is defined', async () => {
const capacitySubscription = jest.fn();
const pollIntervalSubscription = jest.fn();
const { capacityConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({
defaultCapacity: 500,
logger,
errors$: new Subject<Error>(),
config: {
@ -101,7 +97,7 @@ describe('createManagedConfiguration()', () => {
capacityConfiguration$.subscribe(capacitySubscription);
pollIntervalConfiguration$.subscribe(pollIntervalSubscription);
expect(capacitySubscription).toHaveBeenCalledTimes(1);
expect(capacitySubscription).toHaveBeenNthCalledWith(1, DEFAULT_CAPACITY);
expect(capacitySubscription).toHaveBeenNthCalledWith(1, 500);
expect(pollIntervalSubscription).toHaveBeenCalledTimes(1);
expect(pollIntervalSubscription).toHaveBeenNthCalledWith(1, 2);
});

View file

@ -39,8 +39,9 @@ const POLL_INTERVAL_INCREASE_PERCENTAGE = 1.2;
interface ManagedConfigurationOpts {
config: TaskManagerConfig;
logger: Logger;
defaultCapacity?: number;
errors$: Observable<Error>;
logger: Logger;
}
export interface ManagedConfiguration {
@ -51,11 +52,12 @@ export interface ManagedConfiguration {
export function createManagedConfiguration({
config,
defaultCapacity = DEFAULT_CAPACITY,
logger,
errors$,
}: ManagedConfigurationOpts): ManagedConfiguration {
const errorCheck$ = countErrors(errors$, ADJUST_THROUGHPUT_INTERVAL);
const startingCapacity = calculateStartingCapacity(config, logger);
const startingCapacity = calculateStartingCapacity(config, logger, defaultCapacity);
const startingPollInterval = config.poll_interval;
return {
startingCapacity,
@ -210,7 +212,11 @@ function getMinCapacity(config: TaskManagerConfig) {
}
}
export function calculateStartingCapacity(config: TaskManagerConfig, logger: Logger): number {
export function calculateStartingCapacity(
config: TaskManagerConfig,
logger: Logger,
defaultCapacity: number
): number {
if (config.capacity !== undefined && config.max_workers !== undefined) {
logger.warn(
`Both "xpack.task_manager.capacity" and "xpack.task_manager.max_workers" configs are set, max_workers will be ignored in favor of capacity and the setting should be removed.`
@ -225,6 +231,6 @@ export function calculateStartingCapacity(config: TaskManagerConfig, logger: Log
return Math.min(config.max_workers, MAX_CAPACITY);
}
// Neither are set, use DEFAULT CAPACITY
return DEFAULT_CAPACITY;
// Neither are set, use the given default capacity
return defaultCapacity;
}

View file

@ -0,0 +1,185 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { CLAIM_STRATEGY_DEFAULT, CLAIM_STRATEGY_MGET, DEFAULT_CAPACITY } from '../config';
import { getDefaultCapacity } from './get_default_capacity';
describe('getDefaultCapacity', () => {
it('returns default capacity when not in cloud', () => {
expect(
getDefaultCapacity({
heapSizeLimit: 851443712,
isCloud: false,
isServerless: false,
isBackgroundTaskNodeOnly: false,
claimStrategy: CLAIM_STRATEGY_MGET,
})
).toBe(DEFAULT_CAPACITY);
expect(
getDefaultCapacity({
heapSizeLimit: 851443712,
isCloud: false,
isServerless: true,
isBackgroundTaskNodeOnly: false,
claimStrategy: CLAIM_STRATEGY_MGET,
})
).toBe(DEFAULT_CAPACITY);
expect(
getDefaultCapacity({
heapSizeLimit: 851443712,
isCloud: false,
isServerless: false,
isBackgroundTaskNodeOnly: true,
claimStrategy: CLAIM_STRATEGY_MGET,
})
).toBe(DEFAULT_CAPACITY);
expect(
getDefaultCapacity({
heapSizeLimit: 851443712,
isCloud: false,
isServerless: true,
isBackgroundTaskNodeOnly: true,
claimStrategy: CLAIM_STRATEGY_MGET,
})
).toBe(DEFAULT_CAPACITY);
});
it('returns default capacity when default claim strategy', () => {
expect(
getDefaultCapacity({
heapSizeLimit: 851443712,
isCloud: true,
isServerless: false,
isBackgroundTaskNodeOnly: false,
claimStrategy: CLAIM_STRATEGY_DEFAULT,
})
).toBe(DEFAULT_CAPACITY);
expect(
getDefaultCapacity({
heapSizeLimit: 851443712,
isCloud: true,
isServerless: false,
isBackgroundTaskNodeOnly: true,
claimStrategy: CLAIM_STRATEGY_DEFAULT,
})
).toBe(DEFAULT_CAPACITY);
});
it('returns default capacity when serverless', () => {
expect(
getDefaultCapacity({
heapSizeLimit: 851443712,
isCloud: false,
isServerless: true,
isBackgroundTaskNodeOnly: false,
claimStrategy: CLAIM_STRATEGY_MGET,
})
).toBe(DEFAULT_CAPACITY);
expect(
getDefaultCapacity({
heapSizeLimit: 851443712,
isCloud: false,
isServerless: true,
isBackgroundTaskNodeOnly: true,
claimStrategy: CLAIM_STRATEGY_MGET,
})
).toBe(DEFAULT_CAPACITY);
expect(
getDefaultCapacity({
heapSizeLimit: 851443712,
isCloud: true,
isServerless: true,
isBackgroundTaskNodeOnly: false,
claimStrategy: CLAIM_STRATEGY_MGET,
})
).toBe(DEFAULT_CAPACITY);
expect(
getDefaultCapacity({
heapSizeLimit: 851443712,
isCloud: true,
isServerless: true,
isBackgroundTaskNodeOnly: true,
claimStrategy: CLAIM_STRATEGY_MGET,
})
).toBe(DEFAULT_CAPACITY);
});
it('returns capacity as expected when in cloud and claim strategy is mget', () => {
// 1GB
expect(
getDefaultCapacity({
heapSizeLimit: 851443712,
isCloud: true,
isServerless: false,
isBackgroundTaskNodeOnly: false,
claimStrategy: CLAIM_STRATEGY_MGET,
})
).toBe(10);
// 1GB but somehow background task node only is true
expect(
getDefaultCapacity({
heapSizeLimit: 851443712,
isCloud: true,
isServerless: false,
isBackgroundTaskNodeOnly: true,
claimStrategy: CLAIM_STRATEGY_MGET,
})
).toBe(10);
// 2GB
expect(
getDefaultCapacity({
heapSizeLimit: 1702887424,
isCloud: true,
isServerless: false,
isBackgroundTaskNodeOnly: false,
claimStrategy: CLAIM_STRATEGY_MGET,
})
).toBe(15);
// 2GB but somehow background task node only is true
expect(
getDefaultCapacity({
heapSizeLimit: 1702887424,
isCloud: true,
isServerless: false,
isBackgroundTaskNodeOnly: true,
claimStrategy: CLAIM_STRATEGY_MGET,
})
).toBe(15);
// 4GB
expect(
getDefaultCapacity({
heapSizeLimit: 3405774848,
isCloud: true,
isServerless: false,
isBackgroundTaskNodeOnly: false,
claimStrategy: CLAIM_STRATEGY_MGET,
})
).toBe(25);
// 4GB background task only
expect(
getDefaultCapacity({
heapSizeLimit: 3405774848,
isCloud: true,
isServerless: false,
isBackgroundTaskNodeOnly: true,
claimStrategy: CLAIM_STRATEGY_MGET,
})
).toBe(50);
});
});

View file

@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { CLAIM_STRATEGY_MGET, DEFAULT_CAPACITY } from '../config';
interface GetDefaultCapacityOpts {
claimStrategy?: string;
heapSizeLimit: number;
isCloud: boolean;
isServerless: boolean;
isBackgroundTaskNodeOnly: boolean;
}
// Map instance size to desired capacity
const HEAP_TO_CAPACITY_MAP = [
{ minHeap: 0, maxHeap: 1, capacity: 10 },
{ minHeap: 1, maxHeap: 2, capacity: 15 },
{ minHeap: 2, maxHeap: 4, capacity: 25, backgroundTaskNodeOnly: false },
{ minHeap: 2, maxHeap: 4, capacity: 50, backgroundTaskNodeOnly: true },
];
export function getDefaultCapacity({
claimStrategy,
heapSizeLimit: heapSizeLimitInBytes,
isCloud,
isServerless,
isBackgroundTaskNodeOnly,
}: GetDefaultCapacityOpts) {
// perform heap size based calculations only in cloud
if (isCloud && !isServerless && claimStrategy === CLAIM_STRATEGY_MGET) {
// convert bytes to GB
const heapSizeLimitInGB = heapSizeLimitInBytes / 1e9;
const config = HEAP_TO_CAPACITY_MAP.find((map) => {
return (
heapSizeLimitInGB > map.minHeap &&
heapSizeLimitInGB <= map.maxHeap &&
(map.backgroundTaskNodeOnly === undefined ||
isBackgroundTaskNodeOnly === map.backgroundTaskNodeOnly)
);
});
return config?.capacity ?? DEFAULT_CAPACITY;
}
return DEFAULT_CAPACITY;
}

View file

@ -11,6 +11,8 @@ import { TaskManagerConfig } from './config';
import { Subject } from 'rxjs';
import { bufferCount, take } from 'rxjs';
import { CoreStatus, ServiceStatusLevels } from '@kbn/core/server';
import { serverlessPluginMock } from '@kbn/serverless/server/mocks';
import { cloudMock } from '@kbn/cloud-plugin/public/mocks';
import { taskPollingLifecycleMock } from './polling_lifecycle.mock';
import { TaskPollingLifecycle } from './polling_lifecycle';
import type { TaskPollingLifecycle as TaskPollingLifecycleClass } from './polling_lifecycle';
@ -147,7 +149,10 @@ describe('TaskManagerPlugin', () => {
pluginInitializerContext.node.roles.backgroundTasks = true;
const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext);
taskManagerPlugin.setup(coreMock.createSetup(), { usageCollection: undefined });
taskManagerPlugin.start(coreStart);
taskManagerPlugin.start(coreStart, {
serverless: serverlessPluginMock.createStartContract(),
cloud: cloudMock.createStart(),
});
expect(TaskPollingLifecycle as jest.Mock<TaskPollingLifecycleClass>).toHaveBeenCalledTimes(1);
expect(
@ -162,7 +167,10 @@ describe('TaskManagerPlugin', () => {
pluginInitializerContext.node.roles.backgroundTasks = false;
const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext);
taskManagerPlugin.setup(coreMock.createSetup(), { usageCollection: undefined });
taskManagerPlugin.start(coreStart);
taskManagerPlugin.start(coreStart, {
serverless: serverlessPluginMock.createStartContract(),
cloud: cloudMock.createStart(),
});
expect(TaskPollingLifecycle as jest.Mock<TaskPollingLifecycleClass>).not.toHaveBeenCalled();
expect(

View file

@ -18,6 +18,8 @@ import {
ServiceStatusLevels,
CoreStatus,
} from '@kbn/core/server';
import { ServerlessPluginStart } from '@kbn/serverless/server';
import type { CloudStart } from '@kbn/cloud-plugin/server';
import {
registerDeleteInactiveNodesTaskDefinition,
scheduleDeleteInactiveNodesTaskDefinition,
@ -43,6 +45,7 @@ import { setupIntervalLogging } from './lib/log_health_metrics';
import { metricsStream, Metrics } from './metrics';
import { TaskManagerMetricsCollector } from './metrics/task_metrics_collector';
import { TaskPartitioner } from './lib/task_partitioner';
import { getDefaultCapacity } from './lib/get_default_capacity';
export interface TaskManagerSetupContract {
/**
@ -76,6 +79,11 @@ export type TaskManagerStartContract = Pick<
getRegisteredTypes: () => string[];
};
export interface TaskManagerPluginStart {
cloud?: CloudStart;
serverless?: ServerlessPluginStart;
}
const LogHealthForBackgroundTasksOnlyMinutes = 60;
export class TaskManagerPlugin
@ -99,6 +107,7 @@ export class TaskManagerPlugin
private taskManagerMetricsCollector?: TaskManagerMetricsCollector;
private nodeRoles: PluginInitializerContext['node']['roles'];
private kibanaDiscoveryService?: KibanaDiscoveryService;
private heapSizeLimit: number = 0;
constructor(private readonly initContext: PluginInitializerContext) {
this.initContext = initContext;
@ -122,6 +131,13 @@ export class TaskManagerPlugin
): TaskManagerSetupContract {
this.elasticsearchAndSOAvailability$ = getElasticsearchAndSOAvailability(core.status.core$);
core.metrics
.getOpsMetrics$()
.pipe(distinctUntilChanged())
.subscribe((metrics) => {
this.heapSizeLimit = metrics.process.memory.heap.size_limit;
});
setupSavedObjects(core.savedObjects, this.config);
this.taskManagerId = this.initContext.env.instanceUuid;
@ -232,12 +248,10 @@ export class TaskManagerPlugin
};
}
public start({
savedObjects,
elasticsearch,
executionContext,
docLinks,
}: CoreStart): TaskManagerStartContract {
public start(
{ savedObjects, elasticsearch, executionContext, docLinks }: CoreStart,
{ cloud, serverless }: TaskManagerPluginStart
): TaskManagerStartContract {
const savedObjectsRepository = savedObjects.createInternalRepository([
TASK_SO_NAME,
BACKGROUND_TASK_NODE_SO_NAME,
@ -267,10 +281,29 @@ export class TaskManagerPlugin
requestTimeouts: this.config.request_timeouts,
});
const defaultCapacity = getDefaultCapacity({
claimStrategy: this.config?.claim_strategy,
heapSizeLimit: this.heapSizeLimit,
isCloud: cloud?.isCloudEnabled ?? false,
isServerless: !!serverless,
isBackgroundTaskNodeOnly: this.isNodeBackgroundTasksOnly(),
});
this.logger.info(
`Task manager isCloud=${
cloud?.isCloudEnabled ?? false
} isServerless=${!!serverless} claimStrategy=${
this.config!.claim_strategy
} isBackgroundTaskNodeOnly=${this.isNodeBackgroundTasksOnly()} heapSizeLimit=${
this.heapSizeLimit
} defaultCapacity=${defaultCapacity}`
);
const managedConfiguration = createManagedConfiguration({
logger: this.logger,
errors$: taskStore.errors$,
config: this.config!,
errors$: taskStore.errors$,
defaultCapacity,
logger: this.logger,
});
// Only poll for tasks if configured to run tasks

View file

@ -25,7 +25,9 @@
"@kbn/alerting-state-types",
"@kbn/core-saved-objects-api-server",
"@kbn/logging",
"@kbn/core-lifecycle-server"
"@kbn/core-lifecycle-server",
"@kbn/serverless",
"@kbn/cloud-plugin"
],
"exclude": ["target/**/*"]
}