[Task Manager] allow multiple task claiming strategies (#171677)

see https://github.com/elastic/kibana/issues/155770

Make the task manager task claiming algorithm selectable, to allow
alternative implementations in the future. No other implementations are
provided here, this is setup for adding the next algorithm. Task Manager
behavior should not be changed by this PR - code has just be re-org'd.

This exposes a new config key which is exposed to Docker -
`xpack.task_manager.claim_strategy`. The only allowed value is
`default`. No plans at present to document this, or allow-list for the
cloud. We may end up changing the config key to just test for serverless
instead, when we implement the next task claiming algorithm (see
referenced issue ^^^, which is aimed for serverless).

The jest tests were coarsely re-org'd. Once we have > 1 algorithm, we'll
like want to re-org a bit more, so we can test all the implementations
"in a loop".
This commit is contained in:
Patrick Mueller 2023-11-28 13:28:56 -05:00 committed by GitHub
parent 4d1adc1b90
commit 9f5651d3bf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 1758 additions and 1472 deletions

View file

@ -402,6 +402,7 @@ kibana_vars=(
xpack.securitySolution.packagerTaskInterval
xpack.securitySolution.prebuiltRulesPackageVersion
xpack.spaces.maxSpaces
xpack.task_manager.claim_strategy
xpack.task_manager.max_attempts
xpack.task_manager.max_workers
xpack.task_manager.monitored_aggregated_stats_refresh_rate

View file

@ -13,6 +13,7 @@ describe('config validation', () => {
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"allow_reading_invalid_state": true,
"claim_strategy": "default",
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
@ -72,6 +73,7 @@ describe('config validation', () => {
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"allow_reading_invalid_state": true,
"claim_strategy": "default",
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
@ -129,6 +131,7 @@ describe('config validation', () => {
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"allow_reading_invalid_state": true,
"claim_strategy": "default",
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
@ -244,4 +247,13 @@ describe('config validation', () => {
configSchema.validate(config);
}).not.toThrowError();
});
test('the claim strategy is validated', () => {
const config = { claim_strategy: 'invalid-strategy' };
expect(() => {
configSchema.validate(config);
}).toThrowErrorMatchingInlineSnapshot(
`"The claim strategy is invalid: Unknown task claiming strategy (invalid-strategy)"`
);
});
});

View file

@ -6,6 +6,7 @@
*/
import { schema, TypeOf } from '@kbn/config-schema';
import { getTaskClaimer } from './task_claimers';
export const MAX_WORKERS_LIMIT = 100;
export const DEFAULT_MAX_WORKERS = 10;
@ -25,6 +26,8 @@ export const DEFAULT_METRICS_RESET_INTERVAL = 30 * 1000; // 30 seconds
// At the default poll interval of 3sec, this averages over the last 15sec.
export const DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW = 5;
export const CLAIM_STRATEGY_DEFAULT = 'default';
export const taskExecutionFailureThresholdSchema = schema.object(
{
error_threshold: schema.number({
@ -152,6 +155,7 @@ export const configSchema = schema.object(
max: 100,
min: 1,
}),
claim_strategy: schema.string({ defaultValue: CLAIM_STRATEGY_DEFAULT }),
},
{
validate: (config) => {
@ -162,6 +166,11 @@ export const configSchema = schema.object(
) {
return `The specified monitored_stats_required_freshness (${config.monitored_stats_required_freshness}) is invalid, as it is below the poll_interval (${config.poll_interval})`;
}
try {
getTaskClaimer(config.claim_strategy);
} catch (err) {
return `The claim strategy is invalid: ${err.message}`;
}
},
}
);

View file

@ -85,6 +85,7 @@ describe('EphemeralTaskLifecycle', () => {
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
...config,
},
elasticsearchAndSOAvailability$,

View file

@ -80,6 +80,7 @@ describe('managed configuration', () => {
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
});
logger = context.logger.get('taskManager');

View file

@ -57,6 +57,7 @@ const config = {
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
};
const getStatsWithTimestamp = ({

View file

@ -73,6 +73,7 @@ const config: TaskManagerConfig = {
},
version_conflict_threshold: 80,
worker_utilization_running_average_window: 5,
claim_strategy: 'default',
};
describe('createAggregator', () => {

View file

@ -53,6 +53,7 @@ describe('Configuration Statistics Aggregator', () => {
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
};
const managedConfig = {

View file

@ -58,6 +58,7 @@ describe('createMonitoringStatsStream', () => {
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
};
it('returns the initial config used to configure Task Manager', async () => {

View file

@ -78,6 +78,7 @@ const pluginInitializerContextParams = {
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
};
describe('TaskManagerPlugin', () => {

View file

@ -83,6 +83,7 @@ describe('TaskPollingLifecycle', () => {
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
},
taskStore: mockTaskStore,
logger: taskManagerLogger,

View file

@ -41,7 +41,8 @@ import { identifyEsError, isEsCannotExecuteScriptError } from './lib/identify_es
import { BufferedTaskStore } from './buffered_task_store';
import { TaskTypeDictionary } from './task_type_dictionary';
import { delayOnClaimConflicts } from './polling';
import { TaskClaiming, ClaimOwnershipResult } from './queries/task_claiming';
import { TaskClaiming } from './queries/task_claiming';
import { ClaimOwnershipResult } from './task_claimers';
export interface ITaskEventEmitter<T> {
get events(): Observable<T>;
@ -132,6 +133,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
this.taskClaiming = new TaskClaiming({
taskStore,
strategy: config.claim_strategy,
maxAttempts: config.max_attempts,
excludedTaskTypes: config.unsafe.exclude_task_types,
definitions,

View file

@ -8,41 +8,30 @@
/*
* This module contains helpers for managing the task manager storage layer.
*/
import apm from 'elastic-apm-node';
import minimatch from 'minimatch';
import { Subject, Observable, from, of } from 'rxjs';
import { map, mergeScan } from 'rxjs/operators';
import { groupBy, pick, isPlainObject } from 'lodash';
import { Subject, Observable, of } from 'rxjs';
import { map } from 'rxjs/operators';
import { groupBy, isPlainObject } from 'lodash';
import { Logger } from '@kbn/core/server';
import { asOk, asErr, Result } from '../lib/result_type';
import { ConcreteTaskInstance } from '../task';
import { TaskClaim, asTaskClaimEvent, startTaskTimer, TaskTiming } from '../task_events';
import { shouldBeOneOf, mustBeAllOf, filterDownBy, matchesClauses } from './query_clauses';
import { TaskClaim } from '../task_events';
import {
updateFieldsAndMarkAsFailed,
IdleTaskWithExpiredRunAt,
InactiveTasks,
RunningOrClaimingTaskWithExpiredRetryAt,
SortByRunAtAndRetryAt,
tasksClaimedByOwner,
tasksOfType,
EnabledTask,
} from './mark_available_tasks_as_claimed';
import { TaskTypeDictionary } from '../task_type_dictionary';
import {
correctVersionConflictsForContinuation,
TaskStore,
UpdateByQueryResult,
SearchOpts,
} from '../task_store';
import { TaskStore, UpdateByQueryResult } from '../task_store';
import { FillPoolResult } from '../lib/fill_pool';
import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running';
import {
TaskClaimerOpts,
TaskClaimerFn,
ClaimOwnershipResult,
getTaskClaimer,
} from '../task_claimers';
export type { ClaimOwnershipResult } from '../task_claimers';
export interface TaskClaimingOpts {
logger: Logger;
strategy: string;
definitions: TaskTypeDictionary;
unusedTypes: string[];
taskStore: TaskStore;
@ -67,31 +56,25 @@ export interface FetchResult {
docs: ConcreteTaskInstance[];
}
export interface ClaimOwnershipResult {
stats: {
tasksUpdated: number;
tasksConflicted: number;
tasksClaimed: number;
};
docs: ConcreteTaskInstance[];
timing?: TaskTiming;
export function isClaimOwnershipResult(result: unknown): result is ClaimOwnershipResult {
return (
isPlainObject((result as ClaimOwnershipResult).stats) &&
Array.isArray((result as ClaimOwnershipResult).docs)
);
}
export const isClaimOwnershipResult = (result: unknown): result is ClaimOwnershipResult =>
isPlainObject((result as ClaimOwnershipResult).stats) &&
Array.isArray((result as ClaimOwnershipResult).docs);
enum BatchConcurrency {
export enum BatchConcurrency {
Unlimited,
Limited,
}
type TaskClaimingBatches = Array<UnlimitedBatch | LimitedBatch>;
interface TaskClaimingBatch<Concurrency extends BatchConcurrency, TaskType> {
export type TaskClaimingBatches = Array<UnlimitedBatch | LimitedBatch>;
export interface TaskClaimingBatch<Concurrency extends BatchConcurrency, TaskType> {
concurrency: Concurrency;
tasksTypes: TaskType;
}
type UnlimitedBatch = TaskClaimingBatch<BatchConcurrency.Unlimited, Set<string>>;
type LimitedBatch = TaskClaimingBatch<BatchConcurrency.Limited, string>;
export type UnlimitedBatch = TaskClaimingBatch<BatchConcurrency.Unlimited, Set<string>>;
export type LimitedBatch = TaskClaimingBatch<BatchConcurrency.Limited, string>;
export const TASK_MANAGER_MARK_AS_CLAIMED = 'mark-available-tasks-as-claimed';
@ -108,6 +91,7 @@ export class TaskClaiming {
private readonly taskMaxAttempts: Record<string, number>;
private readonly excludedTaskTypes: string[];
private readonly unusedTypes: string[];
private readonly taskClaimer: TaskClaimerFn;
/**
* Constructs a new TaskStore.
@ -120,12 +104,12 @@ export class TaskClaiming {
this.maxAttempts = opts.maxAttempts;
this.taskStore = opts.taskStore;
this.getCapacity = opts.getCapacity;
this.logger = opts.logger;
this.logger = opts.logger.get('taskClaiming');
this.taskClaimingBatchesByType = this.partitionIntoClaimingBatches(this.definitions);
this.taskMaxAttempts = Object.fromEntries(this.normalizeMaxAttempts(this.definitions));
this.excludedTaskTypes = opts.excludedTaskTypes;
this.unusedTypes = opts.unusedTypes;
this.taskClaimer = getTaskClaimer(opts.strategy);
this.events$ = new Subject<TaskClaim>();
}
@ -177,224 +161,43 @@ export class TaskClaiming {
return this.events$;
}
private emitEvents = (events: TaskClaim[]) => {
events.forEach((event) => this.events$.next(event));
};
public claimAvailableTasksIfCapacityIsAvailable(
claimingOptions: Omit<OwnershipClaimingOpts, 'size' | 'taskTypes'>
): Observable<Result<ClaimOwnershipResult, FillPoolResult>> {
if (this.getCapacity()) {
return this.claimAvailableTasks(claimingOptions).pipe(
map((claimResult) => asOk(claimResult))
);
const opts: TaskClaimerOpts = {
batches: this.getClaimingBatches(),
claimOwnershipUntil: claimingOptions.claimOwnershipUntil,
taskStore: this.taskStore,
events$: this.events$,
getCapacity: this.getCapacity,
unusedTypes: this.unusedTypes,
definitions: this.definitions,
taskMaxAttempts: this.taskMaxAttempts,
excludedTaskTypes: this.excludedTaskTypes,
};
return this.taskClaimer(opts).pipe(map((claimResult) => asOk(claimResult)));
}
this.logger.debug(
`[Task Ownership]: Task Manager has skipped Claiming Ownership of available tasks at it has ran out Available Workers.`
);
return of(asErr(FillPoolResult.NoAvailableWorkers));
}
public claimAvailableTasks({
claimOwnershipUntil,
}: Omit<OwnershipClaimingOpts, 'size' | 'taskTypes'>): Observable<ClaimOwnershipResult> {
const initialCapacity = this.getCapacity();
return from(this.getClaimingBatches()).pipe(
mergeScan(
(accumulatedResult, batch) => {
const stopTaskTimer = startTaskTimer();
const capacity = Math.min(
initialCapacity - accumulatedResult.stats.tasksClaimed,
isLimited(batch) ? this.getCapacity(batch.tasksTypes) : this.getCapacity()
);
// if we have no more capacity, short circuit here
if (capacity <= 0) {
return of(accumulatedResult);
}
return from(
this.executeClaimAvailableTasks({
claimOwnershipUntil,
size: capacity,
taskTypes: isLimited(batch) ? new Set([batch.tasksTypes]) : batch.tasksTypes,
}).then((result) => {
const { stats, docs } = accumulateClaimOwnershipResults(accumulatedResult, result);
stats.tasksConflicted = correctVersionConflictsForContinuation(
stats.tasksClaimed,
stats.tasksConflicted,
initialCapacity
);
return { stats, docs, timing: stopTaskTimer() };
})
);
},
// initialise the accumulation with no results
accumulateClaimOwnershipResults(),
// only run one batch at a time
1
)
);
}
private executeClaimAvailableTasks = async ({
claimOwnershipUntil,
size,
taskTypes,
}: OwnershipClaimingOpts): Promise<ClaimOwnershipResult> => {
const { updated: tasksUpdated, version_conflicts: tasksConflicted } =
await this.markAvailableTasksAsClaimed({
claimOwnershipUntil,
size,
taskTypes,
});
const docs = tasksUpdated > 0 ? await this.sweepForClaimedTasks(taskTypes, size) : [];
this.emitEvents(docs.map((doc) => asTaskClaimEvent(doc.id, asOk(doc))));
const stats = {
tasksUpdated,
tasksConflicted,
tasksClaimed: docs.length,
};
return {
stats,
docs,
};
};
private isTaskTypeExcluded(taskType: string) {
for (const excludedType of this.excludedTaskTypes) {
if (minimatch(taskType, excludedType)) {
return true;
}
}
return false;
}
private async markAvailableTasksAsClaimed({
claimOwnershipUntil,
size,
taskTypes,
}: OwnershipClaimingOpts): Promise<UpdateByQueryResult> {
const { taskTypesToSkip = [], taskTypesToClaim = [] } = groupBy(
this.definitions.getAllTypes(),
(type) =>
taskTypes.has(type) && !this.isTaskTypeExcluded(type)
? 'taskTypesToClaim'
: 'taskTypesToSkip'
);
const queryForScheduledTasks = mustBeAllOf(
// Task must be enabled
EnabledTask,
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt)
);
const sort: NonNullable<SearchOpts['sort']> = [SortByRunAtAndRetryAt];
const query = matchesClauses(queryForScheduledTasks, filterDownBy(InactiveTasks));
const script = updateFieldsAndMarkAsFailed({
fieldUpdates: {
ownerId: this.taskStore.taskManagerId,
retryAt: claimOwnershipUntil,
},
claimableTaskTypes: taskTypesToClaim,
skippedTaskTypes: taskTypesToSkip,
unusedTaskTypes: this.unusedTypes,
taskMaxAttempts: pick(this.taskMaxAttempts, taskTypesToClaim),
});
const apmTrans = apm.startTransaction(
TASK_MANAGER_MARK_AS_CLAIMED,
TASK_MANAGER_TRANSACTION_TYPE
);
try {
const result = await this.taskStore.updateByQuery(
{
query,
script,
sort,
},
{
max_docs: size,
}
);
apmTrans.end('success');
return result;
} catch (err) {
apmTrans.end('failure');
throw err;
}
}
/**
* Fetches tasks from the index, which are owned by the current Kibana instance
*/
private async sweepForClaimedTasks(
taskTypes: Set<string>,
size: number
): Promise<ConcreteTaskInstance[]> {
const claimedTasksQuery = tasksClaimedByOwner(
this.taskStore.taskManagerId,
tasksOfType([...taskTypes])
);
const { docs } = await this.taskStore.fetch({
query: claimedTasksQuery,
size,
sort: SortByRunAtAndRetryAt,
seq_no_primary_term: true,
});
return docs;
}
}
const emptyClaimOwnershipResult = () => {
return {
stats: {
tasksUpdated: 0,
tasksConflicted: 0,
tasksClaimed: 0,
tasksRejected: 0,
},
docs: [],
};
};
function accumulateClaimOwnershipResults(
prev: ClaimOwnershipResult = emptyClaimOwnershipResult(),
next?: ClaimOwnershipResult
) {
if (next) {
const { stats, docs, timing } = next;
const res = {
stats: {
tasksUpdated: stats.tasksUpdated + prev.stats.tasksUpdated,
tasksConflicted: stats.tasksConflicted + prev.stats.tasksConflicted,
tasksClaimed: stats.tasksClaimed + prev.stats.tasksClaimed,
},
docs,
timing,
};
return res;
}
return prev;
}
function isLimited(
export function isLimited(
batch: TaskClaimingBatch<BatchConcurrency.Limited | BatchConcurrency.Unlimited, unknown>
): batch is LimitedBatch {
return batch.concurrency === BatchConcurrency.Limited;
}
function asLimited(tasksType: string): LimitedBatch {
return {
concurrency: BatchConcurrency.Limited,
tasksTypes: tasksType,
};
}
function asUnlimited(tasksTypes: Set<string>): UnlimitedBatch {
return {
concurrency: BatchConcurrency.Unlimited,

View file

@ -0,0 +1,20 @@
task_claimers
========================================================================
This directory contains code that claims the next tasks to run.
The code is structured to support multiple strategies, but currently
only supports a `default` strategy.
`default` task claiming strategy
------------------------------------------------------------------------
This has been the strategy for task manager for ... ever? The basic
idea:
- Run an update by query, for number of available workers, to "mark"
task documents as claimed, by setting task state to `claiming`.
We can do some limited per-task logic in that update script.
- A search is then run on the documents updated from the update by
query.

View file

@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { getTaskClaimer } from '.';
import { claimAvailableTasksDefault } from './strategy_default';
describe('task_claimers/index', () => {
describe('getTaskClaimer()', () => {
test('returns expected result for default', () => {
const taskClaimer = getTaskClaimer('default');
expect(taskClaimer).toBe(claimAvailableTasksDefault);
});
test('throws error for unsupported parameter', () => {
expect(() => getTaskClaimer('not-supported')).toThrowErrorMatchingInlineSnapshot(
`"Unknown task claiming strategy (not-supported)"`
);
});
});
});

View file

@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Subject, Observable } from 'rxjs';
import { TaskStore } from '../task_store';
import { TaskClaim, TaskTiming } from '../task_events';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { TaskClaimingBatches } from '../queries/task_claiming';
import { ConcreteTaskInstance } from '../task';
import { claimAvailableTasksDefault } from './strategy_default';
import { CLAIM_STRATEGY_DEFAULT } from '../config';
export interface TaskClaimerOpts {
getCapacity: (taskType?: string | undefined) => number;
claimOwnershipUntil: Date;
batches: TaskClaimingBatches;
events$: Subject<TaskClaim>;
taskStore: TaskStore;
definitions: TaskTypeDictionary;
unusedTypes: string[];
excludedTaskTypes: string[];
taskMaxAttempts: Record<string, number>;
}
export interface ClaimOwnershipResult {
stats: {
tasksUpdated: number;
tasksConflicted: number;
tasksClaimed: number;
};
docs: ConcreteTaskInstance[];
timing?: TaskTiming;
}
export type TaskClaimerFn = (opts: TaskClaimerOpts) => Observable<ClaimOwnershipResult>;
export function getTaskClaimer(strategy: string): TaskClaimerFn {
switch (strategy) {
case CLAIM_STRATEGY_DEFAULT:
return claimAvailableTasksDefault;
}
throw new Error(`Unknown task claiming strategy (${strategy})`);
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,255 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
/*
* This module contains helpers for managing the task manager storage layer.
*/
import apm from 'elastic-apm-node';
import minimatch from 'minimatch';
import { Subject, Observable, from, of } from 'rxjs';
import { mergeScan } from 'rxjs/operators';
import { groupBy, pick } from 'lodash';
import { asOk } from '../lib/result_type';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { TaskClaimerOpts, ClaimOwnershipResult } from '.';
import { ConcreteTaskInstance } from '../task';
import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running';
import { isLimited, TASK_MANAGER_MARK_AS_CLAIMED } from '../queries/task_claiming';
import { TaskClaim, asTaskClaimEvent, startTaskTimer } from '../task_events';
import { shouldBeOneOf, mustBeAllOf, filterDownBy, matchesClauses } from '../queries/query_clauses';
import {
updateFieldsAndMarkAsFailed,
IdleTaskWithExpiredRunAt,
InactiveTasks,
RunningOrClaimingTaskWithExpiredRetryAt,
SortByRunAtAndRetryAt,
tasksClaimedByOwner,
tasksOfType,
EnabledTask,
} from '../queries/mark_available_tasks_as_claimed';
import {
correctVersionConflictsForContinuation,
TaskStore,
UpdateByQueryResult,
SearchOpts,
} from '../task_store';
interface OwnershipClaimingOpts {
claimOwnershipUntil: Date;
size: number;
taskTypes: Set<string>;
taskStore: TaskStore;
events$: Subject<TaskClaim>;
definitions: TaskTypeDictionary;
unusedTypes: string[];
excludedTaskTypes: string[];
taskMaxAttempts: Record<string, number>;
}
export function claimAvailableTasksDefault(
opts: TaskClaimerOpts
): Observable<ClaimOwnershipResult> {
const { getCapacity, claimOwnershipUntil, batches, events$, taskStore } = opts;
const { definitions, unusedTypes, excludedTaskTypes, taskMaxAttempts } = opts;
const initialCapacity = getCapacity();
return from(batches).pipe(
mergeScan(
(accumulatedResult, batch) => {
const stopTaskTimer = startTaskTimer();
const capacity = Math.min(
initialCapacity - accumulatedResult.stats.tasksClaimed,
isLimited(batch) ? getCapacity(batch.tasksTypes) : getCapacity()
);
// if we have no more capacity, short circuit here
if (capacity <= 0) {
return of(accumulatedResult);
}
return from(
executeClaimAvailableTasks({
claimOwnershipUntil,
size: capacity,
events$,
taskTypes: isLimited(batch) ? new Set([batch.tasksTypes]) : batch.tasksTypes,
taskStore,
definitions,
unusedTypes,
excludedTaskTypes,
taskMaxAttempts,
}).then((result) => {
const { stats, docs } = accumulateClaimOwnershipResults(accumulatedResult, result);
stats.tasksConflicted = correctVersionConflictsForContinuation(
stats.tasksClaimed,
stats.tasksConflicted,
initialCapacity
);
return { stats, docs, timing: stopTaskTimer() };
})
);
},
// initialise the accumulation with no results
accumulateClaimOwnershipResults(),
// only run one batch at a time
1
)
);
}
async function executeClaimAvailableTasks(
opts: OwnershipClaimingOpts
): Promise<ClaimOwnershipResult> {
const { taskStore, size, taskTypes, events$ } = opts;
const { updated: tasksUpdated, version_conflicts: tasksConflicted } =
await markAvailableTasksAsClaimed(opts);
const docs = tasksUpdated > 0 ? await sweepForClaimedTasks(taskStore, taskTypes, size) : [];
emitEvents(
events$,
docs.map((doc) => asTaskClaimEvent(doc.id, asOk(doc)))
);
const stats = {
tasksUpdated,
tasksConflicted,
tasksClaimed: docs.length,
};
return {
stats,
docs,
};
}
function emitEvents(events$: Subject<TaskClaim>, events: TaskClaim[]) {
events.forEach((event) => events$.next(event));
}
function isTaskTypeExcluded(excludedTaskTypes: string[], taskType: string) {
for (const excludedType of excludedTaskTypes) {
if (minimatch(taskType, excludedType)) {
return true;
}
}
return false;
}
async function markAvailableTasksAsClaimed({
definitions,
excludedTaskTypes,
taskStore,
claimOwnershipUntil,
size,
taskTypes,
unusedTypes,
taskMaxAttempts,
}: OwnershipClaimingOpts): Promise<UpdateByQueryResult> {
const { taskTypesToSkip = [], taskTypesToClaim = [] } = groupBy(
definitions.getAllTypes(),
(type) =>
taskTypes.has(type) && !isTaskTypeExcluded(excludedTaskTypes, type)
? 'taskTypesToClaim'
: 'taskTypesToSkip'
);
const queryForScheduledTasks = mustBeAllOf(
// Task must be enabled
EnabledTask,
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt)
);
const sort: NonNullable<SearchOpts['sort']> = [SortByRunAtAndRetryAt];
const query = matchesClauses(queryForScheduledTasks, filterDownBy(InactiveTasks));
const script = updateFieldsAndMarkAsFailed({
fieldUpdates: {
ownerId: taskStore.taskManagerId,
retryAt: claimOwnershipUntil,
},
claimableTaskTypes: taskTypesToClaim,
skippedTaskTypes: taskTypesToSkip,
unusedTaskTypes: unusedTypes,
taskMaxAttempts: pick(taskMaxAttempts, taskTypesToClaim),
});
const apmTrans = apm.startTransaction(
TASK_MANAGER_MARK_AS_CLAIMED,
TASK_MANAGER_TRANSACTION_TYPE
);
try {
const result = await taskStore.updateByQuery(
{
query,
script,
sort,
},
{
max_docs: size,
}
);
apmTrans.end('success');
return result;
} catch (err) {
apmTrans.end('failure');
throw err;
}
}
async function sweepForClaimedTasks(
taskStore: TaskStore,
taskTypes: Set<string>,
size: number
): Promise<ConcreteTaskInstance[]> {
const claimedTasksQuery = tasksClaimedByOwner(
taskStore.taskManagerId,
tasksOfType([...taskTypes])
);
const { docs } = await taskStore.fetch({
query: claimedTasksQuery,
size,
sort: SortByRunAtAndRetryAt,
seq_no_primary_term: true,
});
return docs;
}
function emptyClaimOwnershipResult() {
return {
stats: {
tasksUpdated: 0,
tasksConflicted: 0,
tasksClaimed: 0,
tasksRejected: 0,
},
docs: [],
};
}
function accumulateClaimOwnershipResults(
prev: ClaimOwnershipResult = emptyClaimOwnershipResult(),
next?: ClaimOwnershipResult
) {
if (next) {
const { stats, docs, timing } = next;
const res = {
stats: {
tasksUpdated: stats.tasksUpdated + prev.stats.tasksUpdated,
tasksConflicted: stats.tasksConflicted + prev.stats.tasksConflicted,
tasksClaimed: stats.tasksClaimed + prev.stats.tasksClaimed,
},
docs,
timing,
};
return res;
}
return prev;
}