[8.x] [Response Ops][Task Manager] Onboard 12.5% of ECH clusters to use `mget` task claiming (#196317) (#196460)

# Backport

This will backport the following commits from `main` to `8.x`:
- [[Response Ops][Task Manager] Onboard 12.5% of ECH clusters to use
`mget` task claiming
(#196317)](https://github.com/elastic/kibana/pull/196317)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Ying
Mao","email":"ying.mao@elastic.co"},"sourceCommit":{"committedDate":"2024-10-16T00:24:52Z","message":"[Response
Ops][Task Manager] Onboard 12.5% of ECH clusters to use `mget` task
claiming (#196317)\n\nResolves
https://github.com/elastic/response-ops-team/issues/239\r\n\r\n##
Summary\r\n\r\nDeployed to cloud: deployment ID was
`ab4e88d139f93d43024837d96144e7d4`.\r\nSince the deployment ID starts
with an `a`, this should start with\r\n`mget` and I can see in the logs
with the latest push that this is true\r\n\r\n<img width=\"2190\"
alt=\"Screenshot 2024-10-15 at 2 59
20 PM\"\r\nsrc=\"https://github.com/user-attachments/assets/079bc4d8-365e-4ba6-b7a9-59fe506283d9\">\r\n\r\n\r\nDeployed
to serverless: project ID was\r\n`d33d22a94ce246d091220eace2c4e4bb`. See
in the logs: `Using claim\r\nstrategy mget as configured for
deployment\r\nd33d22a94ce246d091220eace2c4e4bb`\r\n\r\nCo-authored-by:
Elastic Machine
<elasticmachine@users.noreply.github.com>","sha":"267efdf31fe9ae314b0bed99bc23db5452a2aaa3","branchLabelMapping":{"^v9.0.0$":"main","^v8.16.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Feature:Task
Manager","Team:ResponseOps","v9.0.0","backport:prev-minor","ci:cloud-deploy","ci:project-deploy-elasticsearch","v8.16.0"],"title":"[Response
Ops][Task Manager] Onboard 12.5% of ECH clusters to use `mget` task
claiming","number":196317,"url":"https://github.com/elastic/kibana/pull/196317","mergeCommit":{"message":"[Response
Ops][Task Manager] Onboard 12.5% of ECH clusters to use `mget` task
claiming (#196317)\n\nResolves
https://github.com/elastic/response-ops-team/issues/239\r\n\r\n##
Summary\r\n\r\nDeployed to cloud: deployment ID was
`ab4e88d139f93d43024837d96144e7d4`.\r\nSince the deployment ID starts
with an `a`, this should start with\r\n`mget` and I can see in the logs
with the latest push that this is true\r\n\r\n<img width=\"2190\"
alt=\"Screenshot 2024-10-15 at 2 59
20 PM\"\r\nsrc=\"https://github.com/user-attachments/assets/079bc4d8-365e-4ba6-b7a9-59fe506283d9\">\r\n\r\n\r\nDeployed
to serverless: project ID was\r\n`d33d22a94ce246d091220eace2c4e4bb`. See
in the logs: `Using claim\r\nstrategy mget as configured for
deployment\r\nd33d22a94ce246d091220eace2c4e4bb`\r\n\r\nCo-authored-by:
Elastic Machine
<elasticmachine@users.noreply.github.com>","sha":"267efdf31fe9ae314b0bed99bc23db5452a2aaa3"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/196317","number":196317,"mergeCommit":{"message":"[Response
Ops][Task Manager] Onboard 12.5% of ECH clusters to use `mget` task
claiming (#196317)\n\nResolves
https://github.com/elastic/response-ops-team/issues/239\r\n\r\n##
Summary\r\n\r\nDeployed to cloud: deployment ID was
`ab4e88d139f93d43024837d96144e7d4`.\r\nSince the deployment ID starts
with an `a`, this should start with\r\n`mget` and I can see in the logs
with the latest push that this is true\r\n\r\n<img width=\"2190\"
alt=\"Screenshot 2024-10-15 at 2 59
20 PM\"\r\nsrc=\"https://github.com/user-attachments/assets/079bc4d8-365e-4ba6-b7a9-59fe506283d9\">\r\n\r\n\r\nDeployed
to serverless: project ID was\r\n`d33d22a94ce246d091220eace2c4e4bb`. See
in the logs: `Using claim\r\nstrategy mget as configured for
deployment\r\nd33d22a94ce246d091220eace2c4e4bb`\r\n\r\nCo-authored-by:
Elastic Machine
<elasticmachine@users.noreply.github.com>","sha":"267efdf31fe9ae314b0bed99bc23db5452a2aaa3"}},{"branch":"8.x","label":"v8.16.0","branchLabelMappingKey":"^v8.16.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Ying Mao <ying.mao@elastic.co>
This commit is contained in:
Kibana Machine 2024-10-16 13:10:18 +11:00 committed by GitHub
parent 34b0772076
commit 0c1333301d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 294 additions and 19 deletions

View file

@ -14,7 +14,6 @@ describe('config validation', () => {
Object {
"allow_reading_invalid_state": true,
"auto_calculate_default_ech_capacity": false,
"claim_strategy": "update_by_query",
"discovery": Object {
"active_nodes_lookback": "30s",
"interval": 10000,
@ -77,7 +76,6 @@ describe('config validation', () => {
Object {
"allow_reading_invalid_state": true,
"auto_calculate_default_ech_capacity": false,
"claim_strategy": "update_by_query",
"discovery": Object {
"active_nodes_lookback": "30s",
"interval": 10000,
@ -138,7 +136,6 @@ describe('config validation', () => {
Object {
"allow_reading_invalid_state": true,
"auto_calculate_default_ech_capacity": false,
"claim_strategy": "update_by_query",
"discovery": Object {
"active_nodes_lookback": "30s",
"interval": 10000,

View file

@ -202,7 +202,7 @@ export const configSchema = schema.object(
max: 100,
min: 1,
}),
claim_strategy: schema.string({ defaultValue: CLAIM_STRATEGY_UPDATE_BY_QUERY }),
claim_strategy: schema.maybe(schema.string()),
request_timeouts: requestTimeoutsConfig,
auto_calculate_default_ech_capacity: schema.boolean({ defaultValue: false }),
},

View file

@ -0,0 +1,197 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
CLAIM_STRATEGY_MGET,
CLAIM_STRATEGY_UPDATE_BY_QUERY,
DEFAULT_POLL_INTERVAL,
MGET_DEFAULT_POLL_INTERVAL,
} from '../config';
import { mockLogger } from '../test_utils';
import { setClaimStrategy } from './set_claim_strategy';
const getConfigWithoutClaimStrategy = () => ({
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
allow_reading_invalid_state: false,
version_conflict_threshold: 80,
monitored_aggregated_stats_refresh_rate: 60000,
monitored_stats_health_verbose_log: {
enabled: false,
level: 'debug' as const,
warn_delayed_task_start_in_seconds: 60,
},
monitored_stats_required_freshness: 4000,
monitored_stats_running_average_window: 50,
request_capacity: 1000,
monitored_task_execution_thresholds: {
default: {
error_threshold: 90,
warn_threshold: 80,
},
custom: {},
},
ephemeral_tasks: {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
authenticate_background_task_utilization: true,
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
request_timeouts: {
update_by_query: 1000,
},
poll_interval: DEFAULT_POLL_INTERVAL,
auto_calculate_default_ech_capacity: false,
});
const logger = mockLogger();
const deploymentIdUpdateByQuery = 'd2f0e7c6bc464a9b8b16e5730b9c40f9';
const deploymentIdMget = 'ab4e88d139f93d43024837d96144e7d4';
describe('setClaimStrategy', () => {
beforeEach(() => {
jest.clearAllMocks();
});
for (const isServerless of [true, false]) {
for (const isCloud of [true, false]) {
for (const deploymentId of [undefined, deploymentIdMget, deploymentIdUpdateByQuery]) {
for (const configuredStrategy of [CLAIM_STRATEGY_MGET, CLAIM_STRATEGY_UPDATE_BY_QUERY]) {
test(`should return config as is when claim strategy is already defined: isServerless=${isServerless}, isCloud=${isCloud}, deploymentId=${deploymentId}`, () => {
const config = {
...getConfigWithoutClaimStrategy(),
claim_strategy: configuredStrategy,
};
const returnedConfig = setClaimStrategy({
config,
logger,
isCloud,
isServerless,
deploymentId,
});
expect(returnedConfig).toStrictEqual(config);
if (deploymentId) {
expect(logger.info).toHaveBeenCalledWith(
`Using claim strategy ${configuredStrategy} as configured for deployment ${deploymentId}`
);
} else {
expect(logger.info).toHaveBeenCalledWith(
`Using claim strategy ${configuredStrategy} as configured`
);
}
});
}
}
}
}
for (const isCloud of [true, false]) {
for (const deploymentId of [undefined, deploymentIdMget, deploymentIdUpdateByQuery]) {
test(`should set claim strategy to mget if in serverless: isCloud=${isCloud}, deploymentId=${deploymentId}`, () => {
const config = getConfigWithoutClaimStrategy();
const returnedConfig = setClaimStrategy({
config,
logger,
isCloud,
isServerless: true,
deploymentId,
});
expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_MGET);
expect(returnedConfig.poll_interval).toBe(MGET_DEFAULT_POLL_INTERVAL);
if (deploymentId) {
expect(logger.info).toHaveBeenCalledWith(
`Setting claim strategy to mget for serverless deployment ${deploymentId}`
);
} else {
expect(logger.info).toHaveBeenCalledWith(`Setting claim strategy to mget`);
}
});
}
}
test(`should set claim strategy to update_by_query if not cloud and not serverless`, () => {
const config = getConfigWithoutClaimStrategy();
const returnedConfig = setClaimStrategy({
config,
logger,
isCloud: false,
isServerless: false,
});
expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_UPDATE_BY_QUERY);
expect(returnedConfig.poll_interval).toBe(DEFAULT_POLL_INTERVAL);
expect(logger.info).not.toHaveBeenCalled();
});
test(`should set claim strategy to update_by_query if cloud and not serverless with undefined deploymentId`, () => {
const config = getConfigWithoutClaimStrategy();
const returnedConfig = setClaimStrategy({
config,
logger,
isCloud: true,
isServerless: false,
});
expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_UPDATE_BY_QUERY);
expect(returnedConfig.poll_interval).toBe(DEFAULT_POLL_INTERVAL);
expect(logger.info).not.toHaveBeenCalled();
});
test(`should set claim strategy to update_by_query if cloud and not serverless and deploymentId does not start with a or b`, () => {
const config = getConfigWithoutClaimStrategy();
const returnedConfig = setClaimStrategy({
config,
logger,
isCloud: true,
isServerless: false,
deploymentId: deploymentIdUpdateByQuery,
});
expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_UPDATE_BY_QUERY);
expect(returnedConfig.poll_interval).toBe(DEFAULT_POLL_INTERVAL);
expect(logger.info).toHaveBeenCalledWith(
`Setting claim strategy to update_by_query for deployment ${deploymentIdUpdateByQuery}`
);
});
test(`should set claim strategy to mget if cloud and not serverless and deploymentId starts with a or b`, () => {
const config = getConfigWithoutClaimStrategy();
const returnedConfig = setClaimStrategy({
config,
logger,
isCloud: true,
isServerless: false,
deploymentId: deploymentIdMget,
});
expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_MGET);
expect(returnedConfig.poll_interval).toBe(MGET_DEFAULT_POLL_INTERVAL);
expect(logger.info).toHaveBeenCalledWith(
`Setting claim strategy to mget for deployment ${deploymentIdMget}`
);
});
});

View file

@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Logger } from '@kbn/core/server';
import {
CLAIM_STRATEGY_MGET,
CLAIM_STRATEGY_UPDATE_BY_QUERY,
DEFAULT_POLL_INTERVAL,
MGET_DEFAULT_POLL_INTERVAL,
TaskManagerConfig,
} from '../config';
interface SetClaimStrategyOpts {
config: TaskManagerConfig;
deploymentId?: string;
isServerless: boolean;
isCloud: boolean;
logger: Logger;
}
export function setClaimStrategy(opts: SetClaimStrategyOpts): TaskManagerConfig {
// if the claim strategy is already defined, return immediately
if (opts.config.claim_strategy) {
opts.logger.info(
`Using claim strategy ${opts.config.claim_strategy} as configured${
opts.deploymentId ? ` for deployment ${opts.deploymentId}` : ''
}`
);
return opts.config;
}
if (opts.isServerless) {
// use mget for serverless
opts.logger.info(
`Setting claim strategy to mget${
opts.deploymentId ? ` for serverless deployment ${opts.deploymentId}` : ''
}`
);
return {
...opts.config,
claim_strategy: CLAIM_STRATEGY_MGET,
poll_interval: MGET_DEFAULT_POLL_INTERVAL,
};
}
let defaultToMget = false;
if (opts.isCloud && !opts.isServerless && opts.deploymentId) {
defaultToMget = opts.deploymentId.startsWith('a') || opts.deploymentId.startsWith('b');
if (defaultToMget) {
opts.logger.info(`Setting claim strategy to mget for deployment ${opts.deploymentId}`);
} else {
opts.logger.info(
`Setting claim strategy to update_by_query for deployment ${opts.deploymentId}`
);
}
}
if (defaultToMget) {
return {
...opts.config,
claim_strategy: CLAIM_STRATEGY_MGET,
poll_interval: MGET_DEFAULT_POLL_INTERVAL,
};
}
return {
...opts.config,
claim_strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
poll_interval: DEFAULT_POLL_INTERVAL,
};
}

View file

@ -18,7 +18,7 @@ import {
ServiceStatusLevels,
CoreStatus,
} from '@kbn/core/server';
import type { CloudStart } from '@kbn/cloud-plugin/server';
import type { CloudSetup, CloudStart } from '@kbn/cloud-plugin/server';
import {
registerDeleteInactiveNodesTaskDefinition,
scheduleDeleteInactiveNodesTaskDefinition,
@ -45,6 +45,7 @@ import { metricsStream, Metrics } from './metrics';
import { TaskManagerMetricsCollector } from './metrics/task_metrics_collector';
import { TaskPartitioner } from './lib/task_partitioner';
import { getDefaultCapacity } from './lib/get_default_capacity';
import { setClaimStrategy } from './lib/set_claim_strategy';
export interface TaskManagerSetupContract {
/**
@ -126,10 +127,18 @@ export class TaskManagerPlugin
public setup(
core: CoreSetup<TaskManagerStartContract, unknown>,
plugins: { usageCollection?: UsageCollectionSetup }
plugins: { cloud?: CloudSetup; usageCollection?: UsageCollectionSetup }
): TaskManagerSetupContract {
this.elasticsearchAndSOAvailability$ = getElasticsearchAndSOAvailability(core.status.core$);
this.config = setClaimStrategy({
config: this.config,
deploymentId: plugins.cloud?.deploymentId,
isServerless: this.initContext.env.packageInfo.buildFlavor === 'serverless',
isCloud: plugins.cloud?.isCloudEnabled ?? false,
logger: this.logger,
});
core.metrics
.getOpsMetrics$()
.pipe(distinctUntilChanged())
@ -137,7 +146,7 @@ export class TaskManagerPlugin
this.heapSizeLimit = metrics.process.memory.heap.size_limit;
});
setupSavedObjects(core.savedObjects, this.config);
setupSavedObjects(core.savedObjects);
this.taskManagerId = this.initContext.env.instanceUuid;
if (!this.taskManagerId) {
@ -301,9 +310,9 @@ export class TaskManagerPlugin
this.config!.claim_strategy
} isBackgroundTaskNodeOnly=${this.isNodeBackgroundTasksOnly()} heapSizeLimit=${
this.heapSizeLimit
} defaultCapacity=${defaultCapacity} autoCalculateDefaultEchCapacity=${
this.config.auto_calculate_default_ech_capacity
}`
} defaultCapacity=${defaultCapacity} pollingInterval=${
this.config!.poll_interval
} autoCalculateDefaultEchCapacity=${this.config.auto_calculate_default_ech_capacity}`
);
const managedConfiguration = createManagedConfiguration({

View file

@ -14,7 +14,7 @@ import type { Logger, ExecutionContextStart } from '@kbn/core/server';
import { Result, asErr, mapErr, asOk, map, mapOk } from './lib/result_type';
import { ManagedConfiguration } from './lib/create_managed_configuration';
import { TaskManagerConfig, CLAIM_STRATEGY_UPDATE_BY_QUERY } from './config';
import { CLAIM_STRATEGY_UPDATE_BY_QUERY, TaskManagerConfig } from './config';
import {
TaskMarkRunning,
@ -141,7 +141,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
this.pool = new TaskPool({
logger,
strategy: config.claim_strategy,
strategy: config.claim_strategy!,
capacity$: capacityConfiguration$,
definitions: this.definitions,
});
@ -149,7 +149,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
this.taskClaiming = new TaskClaiming({
taskStore,
strategy: config.claim_strategy,
strategy: config.claim_strategy!,
maxAttempts: config.max_attempts,
excludedTaskTypes: config.unsafe.exclude_task_types,
definitions,
@ -238,7 +238,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
usageCounter: this.usageCounter,
config: this.config,
allowReadingInvalidState: this.config.allow_reading_invalid_state,
strategy: this.config.claim_strategy,
strategy: this.config.claim_strategy!,
getPollInterval: () => this.currentPollInterval,
});
};

View file

@ -9,7 +9,6 @@ import type { SavedObjectsServiceSetup } from '@kbn/core/server';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { backgroundTaskNodeMapping, taskMappings } from './mappings';
import { getMigrations } from './migrations';
import { TaskManagerConfig } from '../config';
import { getOldestIdleActionTask } from '../queries/oldest_idle_action_task';
import { TASK_MANAGER_INDEX } from '../constants';
import { backgroundTaskNodeModelVersions, taskModelVersions } from './model_versions';
@ -17,10 +16,7 @@ import { backgroundTaskNodeModelVersions, taskModelVersions } from './model_vers
export const TASK_SO_NAME = 'task';
export const BACKGROUND_TASK_NODE_SO_NAME = 'background-task-node';
export function setupSavedObjects(
savedObjects: SavedObjectsServiceSetup,
config: TaskManagerConfig
) {
export function setupSavedObjects(savedObjects: SavedObjectsServiceSetup) {
savedObjects.registerType({
name: TASK_SO_NAME,
namespaceType: 'agnostic',