Rename task claimers (#190542)

In this PR, I'm renaming the task managers as we prepare to rollout the
`mget` task claiming strategy as the default.

Rename:
- `unsafe_mget` -> `mget`
- `default` -> `update_by_query`

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Mike Côté 2024-08-21 11:09:10 -04:00 committed by GitHub
parent 730f8eae87
commit fdae1348df
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 71 additions and 66 deletions

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { configSchema, CLAIM_STRATEGY_DEFAULT, CLAIM_STRATEGY_MGET } from './config';
import { configSchema, CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET } from './config';
describe('config validation', () => {
test('task manager defaults', () => {
@ -13,7 +13,7 @@ describe('config validation', () => {
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"allow_reading_invalid_state": true,
"claim_strategy": "default",
"claim_strategy": "update_by_query",
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
@ -70,7 +70,7 @@ describe('config validation', () => {
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"allow_reading_invalid_state": true,
"claim_strategy": "default",
"claim_strategy": "update_by_query",
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
@ -125,7 +125,7 @@ describe('config validation', () => {
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"allow_reading_invalid_state": true,
"claim_strategy": "default",
"claim_strategy": "update_by_query",
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
@ -244,7 +244,7 @@ describe('config validation', () => {
});
test('default claim strategy defaults poll interval to 3000ms', () => {
const result = configSchema.validate({ claim_strategy: CLAIM_STRATEGY_DEFAULT });
const result = configSchema.validate({ claim_strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY });
expect(result.poll_interval).toEqual(3000);
});

View file

@ -29,8 +29,8 @@ export const DEFAULT_METRICS_RESET_INTERVAL = 30 * 1000; // 30 seconds
// At the default poll interval of 3sec, this averages over the last 15sec.
export const DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW = 5;
export const CLAIM_STRATEGY_DEFAULT = 'default';
export const CLAIM_STRATEGY_MGET = 'unsafe_mget';
export const CLAIM_STRATEGY_UPDATE_BY_QUERY = 'update_by_query';
export const CLAIM_STRATEGY_MGET = 'mget';
export const taskExecutionFailureThresholdSchema = schema.object(
{
@ -168,7 +168,7 @@ export const configSchema = schema.object(
max: 100,
min: 1,
}),
claim_strategy: schema.string({ defaultValue: CLAIM_STRATEGY_DEFAULT }),
claim_strategy: schema.string({ defaultValue: CLAIM_STRATEGY_UPDATE_BY_QUERY }),
request_timeouts: requestTimeoutsConfig,
},
{

View file

@ -315,7 +315,7 @@ describe('managed configuration', () => {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'unsafe_mget',
claim_strategy: 'mget',
request_timeouts: {
update_by_query: 1000,
},

View file

@ -58,7 +58,7 @@ describe('switch task claiming strategies', () => {
let kibanaServer = setupResultDefault.kibanaServer;
let taskClaimingOpts: TaskClaimingOpts = TaskClaimingMock.mock.calls[0][0];
expect(taskClaimingOpts.strategy).toBe('default');
expect(taskClaimingOpts.strategy).toBe('update_by_query');
mockTaskTypeRunFn.mockImplementation(() => {
return { state: {} };
@ -93,14 +93,14 @@ describe('switch task claiming strategies', () => {
const setupResultMget = await setupKibanaServer({
xpack: {
task_manager: {
claim_strategy: 'unsafe_mget',
claim_strategy: 'mget',
},
},
});
kibanaServer = setupResultMget.kibanaServer;
taskClaimingOpts = TaskClaimingMock.mock.calls[1][0];
expect(taskClaimingOpts.strategy).toBe('unsafe_mget');
expect(taskClaimingOpts.strategy).toBe('mget');
// inject a task to run and ensure it is claimed and run
const id2 = uuidV4();
@ -136,7 +136,7 @@ describe('switch task claiming strategies', () => {
const setupResultMget = await setupTestServers({
xpack: {
task_manager: {
claim_strategy: 'unsafe_mget',
claim_strategy: 'mget',
},
},
});
@ -144,7 +144,7 @@ describe('switch task claiming strategies', () => {
let kibanaServer = setupResultMget.kibanaServer;
let taskClaimingOpts: TaskClaimingOpts = TaskClaimingMock.mock.calls[0][0];
expect(taskClaimingOpts.strategy).toBe('unsafe_mget');
expect(taskClaimingOpts.strategy).toBe('mget');
mockTaskTypeRunFn.mockImplementation(() => {
return { state: {} };
@ -180,7 +180,7 @@ describe('switch task claiming strategies', () => {
kibanaServer = setupResultDefault.kibanaServer;
taskClaimingOpts = TaskClaimingMock.mock.calls[1][0];
expect(taskClaimingOpts.strategy).toBe('default');
expect(taskClaimingOpts.strategy).toBe('update_by_query');
// inject a task to run and ensure it is claimed and run
const id2 = uuidV4();
@ -218,7 +218,7 @@ describe('switch task claiming strategies', () => {
let kibanaServer = setupResultDefault.kibanaServer;
let taskClaimingOpts: TaskClaimingOpts = TaskClaimingMock.mock.calls[0][0];
expect(taskClaimingOpts.strategy).toBe('default');
expect(taskClaimingOpts.strategy).toBe('update_by_query');
mockTaskTypeRunFn.mockImplementation(async () => {
await new Promise((resolve) => setTimeout(resolve, 2000));
@ -255,14 +255,14 @@ describe('switch task claiming strategies', () => {
const setupResultMget = await setupKibanaServer({
xpack: {
task_manager: {
claim_strategy: 'unsafe_mget',
claim_strategy: 'mget',
},
},
});
kibanaServer = setupResultMget.kibanaServer;
taskClaimingOpts = TaskClaimingMock.mock.calls[1][0];
expect(taskClaimingOpts.strategy).toBe('unsafe_mget');
expect(taskClaimingOpts.strategy).toBe('mget');
// task doc should still exist and be running
const task = await kibanaServer.coreStart.elasticsearch.client.asInternalUser.get<{
@ -294,7 +294,7 @@ describe('switch task claiming strategies', () => {
const setupResultMget = await setupTestServers({
xpack: {
task_manager: {
claim_strategy: 'unsafe_mget',
claim_strategy: 'mget',
},
},
});
@ -302,7 +302,7 @@ describe('switch task claiming strategies', () => {
let kibanaServer = setupResultMget.kibanaServer;
let taskClaimingOpts: TaskClaimingOpts = TaskClaimingMock.mock.calls[0][0];
expect(taskClaimingOpts.strategy).toBe('unsafe_mget');
expect(taskClaimingOpts.strategy).toBe('mget');
mockTaskTypeRunFn.mockImplementation(async () => {
await new Promise((resolve) => setTimeout(resolve, 2000));
@ -340,7 +340,7 @@ describe('switch task claiming strategies', () => {
kibanaServer = setupResultDefault.kibanaServer;
taskClaimingOpts = TaskClaimingMock.mock.calls[1][0];
expect(taskClaimingOpts.strategy).toBe('default');
expect(taskClaimingOpts.strategy).toBe('update_by_query');
// task doc should still exist and be running
const task = await kibanaServer.coreStart.elasticsearch.client.asInternalUser.get<{

View file

@ -13,7 +13,7 @@ import {
ADJUST_THROUGHPUT_INTERVAL,
} from './create_managed_configuration';
import { mockLogger } from '../test_utils';
import { CLAIM_STRATEGY_DEFAULT, CLAIM_STRATEGY_MGET, TaskManagerConfig } from '../config';
import { CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET, TaskManagerConfig } from '../config';
describe('createManagedConfiguration()', () => {
let clock: sinon.SinonFakeTimers;
@ -148,7 +148,7 @@ describe('createManagedConfiguration()', () => {
describe('capacity configuration', () => {
function setupScenario(
startingCapacity: number,
claimStrategy: string = CLAIM_STRATEGY_DEFAULT
claimStrategy: string = CLAIM_STRATEGY_UPDATE_BY_QUERY
) {
const errors$ = new Subject<Error>();
const subscription = jest.fn();

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { CLAIM_STRATEGY_DEFAULT, CLAIM_STRATEGY_MGET, DEFAULT_CAPACITY } from '../config';
import { CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET, DEFAULT_CAPACITY } from '../config';
import { getDefaultCapacity } from './get_default_capacity';
describe('getDefaultCapacity', () => {
@ -58,7 +58,7 @@ describe('getDefaultCapacity', () => {
isCloud: true,
isServerless: false,
isBackgroundTaskNodeOnly: false,
claimStrategy: CLAIM_STRATEGY_DEFAULT,
claimStrategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
})
).toBe(DEFAULT_CAPACITY);
@ -68,7 +68,7 @@ describe('getDefaultCapacity', () => {
isCloud: true,
isServerless: false,
isBackgroundTaskNodeOnly: true,
claimStrategy: CLAIM_STRATEGY_DEFAULT,
claimStrategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
})
).toBe(DEFAULT_CAPACITY);
});

View file

@ -10,7 +10,7 @@ import { pick, merge } from 'lodash';
import { map, startWith } from 'rxjs';
import { JsonObject } from '@kbn/utility-types';
import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator';
import { CLAIM_STRATEGY_DEFAULT, TaskManagerConfig } from '../config';
import { CLAIM_STRATEGY_UPDATE_BY_QUERY, TaskManagerConfig } from '../config';
import { ManagedConfiguration } from '../lib/create_managed_configuration';
import { getCapacityInCost, getCapacityInWorkers } from '../task_pool';
@ -41,7 +41,7 @@ export function createConfigurationAggregator(
): AggregatedStatProvider<ConfigStat> {
return combineLatest([
of(pick(config, ...CONFIG_FIELDS_TO_EXPOSE)),
of({ claim_strategy: config.claim_strategy ?? CLAIM_STRATEGY_DEFAULT }),
of({ claim_strategy: config.claim_strategy ?? CLAIM_STRATEGY_UPDATE_BY_QUERY }),
managedConfig.pollIntervalConfiguration$.pipe(
startWith(config.poll_interval),
map<number, Pick<TaskManagerConfig, 'poll_interval'>>((pollInterval) => ({

View file

@ -134,7 +134,7 @@ describe('TaskPollingLifecycle', () => {
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
});
test('provides TaskClaiming with the capacity available when strategy = CLAIM_STRATEGY_DEFAULT', () => {
test('provides TaskClaiming with the capacity available when strategy = CLAIM_STRATEGY_UPDATE_BY_QUERY', () => {
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
const capacity$ = new Subject<number>();

View file

@ -14,7 +14,7 @@ import type { Logger, ExecutionContextStart } from '@kbn/core/server';
import { Result, asErr, mapErr, asOk, map, mapOk } from './lib/result_type';
import { ManagedConfiguration } from './lib/create_managed_configuration';
import { TaskManagerConfig, CLAIM_STRATEGY_DEFAULT } from './config';
import { TaskManagerConfig, CLAIM_STRATEGY_UPDATE_BY_QUERY } from './config';
import {
TaskMarkRunning,
@ -155,7 +155,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
const { poll_interval: pollInterval, claim_strategy: claimStrategy } = config;
let pollIntervalDelay$: Observable<number> | undefined;
if (claimStrategy === CLAIM_STRATEGY_DEFAULT) {
if (claimStrategy === CLAIM_STRATEGY_UPDATE_BY_QUERY) {
pollIntervalDelay$ = delayOnClaimConflicts(
capacityConfiguration$,
pollIntervalConfiguration$,

View file

@ -85,7 +85,7 @@ describe('TaskClaiming', () => {
});
expect(taskManagerLogger.warn).toHaveBeenCalledWith(
'Unknown task claiming strategy "non-default", falling back to default'
'Unknown task claiming strategy "non-default", falling back to update_by_query'
);
});

View file

@ -7,7 +7,7 @@
import { getTaskClaimer } from '.';
import { mockLogger } from '../test_utils';
import { claimAvailableTasksDefault } from './strategy_default';
import { claimAvailableTasksUpdateByQuery } from './strategy_update_by_query';
import { claimAvailableTasksMget } from './strategy_mget';
const logger = mockLogger();
@ -16,23 +16,23 @@ describe('task_claimers/index', () => {
beforeEach(() => jest.resetAllMocks());
describe('getTaskClaimer()', () => {
test('returns expected result for default', () => {
const taskClaimer = getTaskClaimer(logger, 'default');
expect(taskClaimer).toBe(claimAvailableTasksDefault);
test('returns expected result for update_by_query', () => {
const taskClaimer = getTaskClaimer(logger, 'update_by_query');
expect(taskClaimer).toBe(claimAvailableTasksUpdateByQuery);
expect(logger.warn).not.toHaveBeenCalled();
});
test('returns expected result for mget', () => {
const taskClaimer = getTaskClaimer(logger, 'unsafe_mget');
const taskClaimer = getTaskClaimer(logger, 'mget');
expect(taskClaimer).toBe(claimAvailableTasksMget);
expect(logger.warn).not.toHaveBeenCalled();
});
test('logs a warning for unsupported parameter', () => {
const taskClaimer = getTaskClaimer(logger, 'not-supported');
expect(taskClaimer).toBe(claimAvailableTasksDefault);
expect(taskClaimer).toBe(claimAvailableTasksUpdateByQuery);
expect(logger.warn).toHaveBeenCalledWith(
'Unknown task claiming strategy "not-supported", falling back to default'
'Unknown task claiming strategy "not-supported", falling back to update_by_query'
);
});
});

View file

@ -13,9 +13,9 @@ import { TaskClaim, TaskTiming } from '../task_events';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { TaskClaimingBatches } from '../queries/task_claiming';
import { ConcreteTaskInstance } from '../task';
import { claimAvailableTasksDefault } from './strategy_default';
import { claimAvailableTasksUpdateByQuery } from './strategy_update_by_query';
import { claimAvailableTasksMget } from './strategy_mget';
import { CLAIM_STRATEGY_DEFAULT, CLAIM_STRATEGY_MGET } from '../config';
import { CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET } from '../config';
import { TaskPartitioner } from '../lib/task_partitioner';
export interface TaskClaimerOpts {
@ -49,17 +49,17 @@ let WarnedOnInvalidClaimer = false;
export function getTaskClaimer(logger: Logger, strategy: string): TaskClaimerFn {
switch (strategy) {
case CLAIM_STRATEGY_DEFAULT:
return claimAvailableTasksDefault;
case CLAIM_STRATEGY_UPDATE_BY_QUERY:
return claimAvailableTasksUpdateByQuery;
case CLAIM_STRATEGY_MGET:
return claimAvailableTasksMget;
}
if (!WarnedOnInvalidClaimer) {
WarnedOnInvalidClaimer = true;
logger.warn(`Unknown task claiming strategy "${strategy}", falling back to default`);
logger.warn(`Unknown task claiming strategy "${strategy}", falling back to update_by_query`);
}
return claimAvailableTasksDefault;
return claimAvailableTasksUpdateByQuery;
}
export function getEmptyClaimOwnershipResult(): ClaimOwnershipResult {

View file

@ -1248,7 +1248,7 @@ if (doc['task.runAt'].size()!=0) {
const taskClaiming = new TaskClaiming({
logger: taskManagerLogger,
strategy: 'default',
strategy: 'update_by_query',
definitions,
excludedTaskTypes: [],
unusedTypes: [],

View file

@ -53,7 +53,7 @@ interface OwnershipClaimingOpts {
taskMaxAttempts: Record<string, number>;
}
export function claimAvailableTasksDefault(
export function claimAvailableTasksUpdateByQuery(
opts: TaskClaimerOpts
): Observable<ClaimOwnershipResult> {
const { getCapacity, claimOwnershipUntil, batches, events$, taskStore } = opts;

View file

@ -18,7 +18,7 @@ import { TaskCost } from '../task';
import * as CostCapacityModule from './cost_capacity';
import * as WorkerCapacityModule from './worker_capacity';
import { capacityMock } from './capacity.mock';
import { CLAIM_STRATEGY_DEFAULT, CLAIM_STRATEGY_MGET } from '../config';
import { CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET } from '../config';
import { mockRun, mockTask } from './test_utils';
import { TaskTypeDictionary } from '../task_type_dictionary';
@ -82,7 +82,12 @@ describe('TaskPool', () => {
});
test('uses WorkerCapacity to calculate capacity when strategy is default', () => {
new TaskPool({ capacity$: of(20), definitions, logger, strategy: CLAIM_STRATEGY_DEFAULT });
new TaskPool({
capacity$: of(20),
definitions,
logger,
strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
});
expect(CostCapacityModule.CostCapacity).not.toHaveBeenCalled();
expect(WorkerCapacityModule.WorkerCapacity).toHaveBeenCalledTimes(1);
@ -96,13 +101,13 @@ describe('TaskPool', () => {
});
});
describe('with CLAIM_STRATEGY_DEFAULT', () => {
describe('with CLAIM_STRATEGY_UPDATE_BY_QUERY', () => {
test('usedCapacity is the number running tasks', async () => {
const pool = new TaskPool({
capacity$: of(10),
definitions,
logger,
strategy: CLAIM_STRATEGY_DEFAULT,
strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
});
const result = await pool.run([{ ...mockTask() }, { ...mockTask() }, { ...mockTask() }]);
@ -116,7 +121,7 @@ describe('TaskPool', () => {
capacity$: of(10),
definitions,
logger,
strategy: CLAIM_STRATEGY_DEFAULT,
strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
});
const result = await pool.run([{ ...mockTask() }, { ...mockTask() }, { ...mockTask() }]);
@ -131,7 +136,7 @@ describe('TaskPool', () => {
capacity$,
definitions,
logger,
strategy: CLAIM_STRATEGY_DEFAULT,
strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
});
expect(pool.availableCapacity()).toEqual(0);
@ -144,7 +149,7 @@ describe('TaskPool', () => {
capacity$: of(2),
definitions,
logger,
strategy: CLAIM_STRATEGY_DEFAULT,
strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
});
const shouldRun = mockRun();
@ -167,7 +172,7 @@ describe('TaskPool', () => {
capacity$: of(3),
definitions,
logger,
strategy: CLAIM_STRATEGY_DEFAULT,
strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
});
const taskFailedToMarkAsRunning = mockTask();
@ -191,7 +196,7 @@ describe('TaskPool', () => {
capacity$: of(3),
definitions,
logger,
strategy: CLAIM_STRATEGY_DEFAULT,
strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
});
const taskFailedToRun = mockTask();
@ -215,7 +220,7 @@ describe('TaskPool', () => {
capacity$: of(3),
definitions,
logger,
strategy: CLAIM_STRATEGY_DEFAULT,
strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
});
const taskFailedToRun = mockTask();
@ -238,7 +243,7 @@ describe('TaskPool', () => {
capacity$: of(1),
definitions,
logger,
strategy: CLAIM_STRATEGY_DEFAULT,
strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
});
const taskFailedToRun = mockTask();
@ -257,7 +262,7 @@ describe('TaskPool', () => {
capacity$: of(1),
definitions,
logger,
strategy: CLAIM_STRATEGY_DEFAULT,
strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
});
const firstWork = resolvable();
@ -304,7 +309,7 @@ describe('TaskPool', () => {
capacity$: of(2),
definitions,
logger,
strategy: CLAIM_STRATEGY_DEFAULT,
strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
});
const haltUntilWeAfterFirstRun = resolvable();
@ -372,7 +377,7 @@ describe('TaskPool', () => {
capacity$: of(1),
definitions,
logger,
strategy: CLAIM_STRATEGY_DEFAULT,
strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
});
const taskIsRunning = resolvable();
@ -421,7 +426,7 @@ describe('TaskPool', () => {
capacity$: of(10),
definitions,
logger,
strategy: CLAIM_STRATEGY_DEFAULT,
strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
});
const cancelled = resolvable();
@ -459,7 +464,7 @@ describe('TaskPool', () => {
capacity$: of(2),
definitions,
logger,
strategy: CLAIM_STRATEGY_DEFAULT,
strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
});
const shouldRun = mockRun();

View file

@ -145,7 +145,7 @@ export default function ({ getService }: FtrProviderContext) {
as_workers: 10,
as_cost: 20,
},
claim_strategy: 'default',
claim_strategy: 'update_by_query',
});
});

View file

@ -28,7 +28,7 @@ export default async function ({ readConfigFile }: FtrConfigProviderContext) {
...integrationConfig.get('kbnTestServer.serverArgs'),
'--xpack.eventLog.logEntries=true',
'--xpack.eventLog.indexEntries=true',
'--xpack.task_manager.claim_strategy="unsafe_mget"',
'--xpack.task_manager.claim_strategy="mget"',
'--xpack.task_manager.monitored_aggregated_stats_refresh_rate=5000',
'--xpack.task_manager.ephemeral_tasks.enabled=false',
'--xpack.task_manager.ephemeral_tasks.request_capacity=100',

View file

@ -145,7 +145,7 @@ export default function ({ getService }: FtrProviderContext) {
as_workers: 10,
as_cost: 20,
},
claim_strategy: 'unsafe_mget',
claim_strategy: 'mget',
});
});