Cleanup unused code for claiming tasks by id (#144408)

In this PR, I'm cleaning up code within Task Manager regarding claiming
tasks by id that is no longer used. The code was previously used for
running alerting rules right away but recently `runNow` got replaced
with `runSoon` which no longer needs this code to function. For more
info on the previous change, see
https://github.com/elastic/kibana/issues/133550.
This commit is contained in:
Mike Côté 2022-11-09 06:37:13 -05:00 committed by GitHub
parent 08eb63af67
commit daf9322326
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 22 additions and 755 deletions

View file

@ -264,7 +264,6 @@ export class TaskManagerPlugin
taskStore,
middleware: this.middleware,
ephemeralTaskLifecycle: this.ephemeralTaskLifecycle,
definitions: this.definitions,
taskManagerId: taskStore.taskManagerId,
});

View file

@ -206,9 +206,7 @@ describe('TaskPollingLifecycle', () => {
)
);
expect(
isOk(await getFirstAsPromise(claimAvailableTasks([], taskClaiming, logger)))
).toBeTruthy();
expect(isOk(await getFirstAsPromise(claimAvailableTasks(taskClaiming, logger)))).toBeTruthy();
expect(taskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalledTimes(1);
});
@ -266,7 +264,7 @@ describe('TaskPollingLifecycle', () => {
})
);
const err = await getFirstAsPromise(claimAvailableTasks([], taskClaiming, logger));
const err = await getFirstAsPromise(claimAvailableTasks(taskClaiming, logger));
expect(isErr(err)).toBeTruthy();
expect((err as Err<FillPoolResult>).error).toEqual(FillPoolResult.Failed);

View file

@ -254,11 +254,7 @@ export class TaskPollingLifecycle {
return fillPool(
// claim available tasks
() => {
return claimAvailableTasks(
tasksToClaim.splice(0, this.pool.availableWorkers),
this.taskClaiming,
this.logger
).pipe(
return claimAvailableTasks(this.taskClaiming, this.logger).pipe(
tap(
mapOk(({ timing }: ClaimOwnershipResult) => {
if (timing) {
@ -313,7 +309,6 @@ export class TaskPollingLifecycle {
}
export function claimAvailableTasks(
claimTasksById: string[],
taskClaiming: TaskClaiming,
logger: Logger
): Observable<Result<ClaimOwnershipResult, FillPoolResult>> {
@ -321,7 +316,6 @@ export function claimAvailableTasks(
taskClaiming
.claimAvailableTasksIfCapacityIsAvailable({
claimOwnershipUntil: intervalFromNow('30s')!,
claimTasksById,
})
.subscribe(
(claimResult) => {

View file

@ -43,7 +43,6 @@ describe('mark_available_tasks_as_claimed', () => {
createTaskRunner: () => ({ run: () => Promise.resolve() }),
},
});
const claimTasksById = undefined;
const defaultMaxAttempts = 1;
const taskManagerId = '3478fg6-82374f6-83467gf5-384g6f';
const claimOwnershipUntil = '2019-02-12T21:01:22.479Z';
@ -62,7 +61,6 @@ describe('mark_available_tasks_as_claimed', () => {
),
script: updateFieldsAndMarkAsFailed({
fieldUpdates,
claimTasksById: claimTasksById || [],
claimableTaskTypes: definitions.getAllTypes(),
skippedTaskTypes: [],
unusedTaskTypes: [],
@ -140,7 +138,7 @@ if (doc['task.runAt'].size()!=0) {
script: {
source: `
if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType]) {
if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
ctx._source.task.scheduledAt=ctx._source.task.retryAt;
} else {
@ -152,15 +150,6 @@ if (doc['task.runAt'].size()!=0) {
} else {
ctx._source.task.status = "failed";
}
} else if (params.skippedTaskTypes.contains(ctx._source.task.taskType) && params.claimTasksById.contains(ctx._id)) {
if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
ctx._source.task.scheduledAt=ctx._source.task.retryAt;
} else {
ctx._source.task.scheduledAt=ctx._source.task.runAt;
}
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
@ -173,7 +162,6 @@ if (doc['task.runAt'].size()!=0) {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
},
claimTasksById: [],
claimableTaskTypes: ['sampleTask', 'otherTask'],
skippedTaskTypes: [],
unusedTaskTypes: [],
@ -187,79 +175,6 @@ if (doc['task.runAt'].size()!=0) {
});
describe(`script`, () => {
test('it supports claiming specific tasks by id', async () => {
const taskManagerId = '3478fg6-82374f6-83467gf5-384g6f';
const claimOwnershipUntil = '2019-02-12T21:01:22.479Z';
const fieldUpdates = {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
};
const claimTasksById = [
'33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
];
expect(
updateFieldsAndMarkAsFailed({
fieldUpdates,
claimTasksById,
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: {
foo: 5,
bar: 2,
},
})
).toMatchObject({
source: `
if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
ctx._source.task.scheduledAt=ctx._source.task.retryAt;
} else {
ctx._source.task.scheduledAt=ctx._source.task.runAt;
}
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else {
ctx._source.task.status = "failed";
}
} else if (params.skippedTaskTypes.contains(ctx._source.task.taskType) && params.claimTasksById.contains(ctx._id)) {
if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
ctx._source.task.scheduledAt=ctx._source.task.retryAt;
} else {
ctx._source.task.scheduledAt=ctx._source.task.runAt;
}
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
ctx.op = "noop";
}`,
lang: 'painless',
params: {
now: 0,
fieldUpdates,
claimTasksById: [
'33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
],
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: {
foo: 5,
bar: 2,
},
},
});
});
test('it marks the update as a noop if the type is skipped', async () => {
const taskManagerId = '3478fg6-82374f6-83467gf5-384g6f';
const claimOwnershipUntil = '2019-02-12T21:01:22.479Z';
@ -271,7 +186,6 @@ if (doc['task.runAt'].size()!=0) {
expect(
updateFieldsAndMarkAsFailed({
fieldUpdates,
claimTasksById: [],
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],

View file

@ -120,7 +120,6 @@ export interface UpdateFieldsAndMarkAsFailedOpts {
fieldUpdates: {
[field: string]: string | number | Date;
};
claimTasksById: string[];
claimableTaskTypes: string[];
skippedTaskTypes: string[];
unusedTaskTypes: string[];
@ -129,7 +128,6 @@ export interface UpdateFieldsAndMarkAsFailedOpts {
export const updateFieldsAndMarkAsFailed = ({
fieldUpdates,
claimTasksById,
claimableTaskTypes,
skippedTaskTypes,
unusedTaskTypes,
@ -148,13 +146,11 @@ export const updateFieldsAndMarkAsFailed = ({
return {
source: `
if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType]) {
${setScheduledAtAndMarkAsClaimed}
} else {
ctx._source.task.status = "failed";
}
} else if (params.skippedTaskTypes.contains(ctx._source.task.taskType) && params.claimTasksById.contains(ctx._id)) {
${setScheduledAtAndMarkAsClaimed}
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
@ -164,7 +160,6 @@ export const updateFieldsAndMarkAsFailed = ({
params: {
now: new Date().getTime(),
fieldUpdates,
claimTasksById,
claimableTaskTypes,
skippedTaskTypes,
unusedTaskTypes,

View file

@ -67,15 +67,3 @@ export function filterDownBy(...filter: estypes.QueryDslQueryContainer[]) {
},
};
}
export function asPinnedQuery(
ids: estypes.QueryDslPinnedQuery['ids'],
organic: estypes.QueryDslPinnedQuery['organic']
): Pick<estypes.QueryDslQueryContainer, 'pinned'> {
return {
pinned: {
ids,
organic,
},
};
}

View file

@ -8,12 +8,11 @@
import _ from 'lodash';
import uuid from 'uuid';
import { filter, take, toArray } from 'rxjs/operators';
import { some, none } from 'fp-ts/lib/Option';
import { TaskStatus, ConcreteTaskInstance } from '../task';
import { SearchOpts, StoreOpts, UpdateByQueryOpts, UpdateByQuerySearchOpts } from '../task_store';
import { asTaskClaimEvent, ClaimTaskErr, TaskClaimErrorType, TaskEvent } from '../task_events';
import { asOk, asErr } from '../lib/result_type';
import { asTaskClaimEvent, TaskEvent } from '../task_events';
import { asOk } from '../lib/result_type';
import { TaskTypeDictionary } from '../task_type_dictionary';
import type { MustNotCondition } from './query_clauses';
import { mockLogger } from '../test_utils';
@ -393,168 +392,6 @@ if (doc['task.runAt'].size()!=0) {
]);
});
test('it supports claiming specific tasks by id', async () => {
const maxAttempts = _.random(2, 43);
const customMaxAttempts = _.random(44, 100);
const definitions = new TaskTypeDictionary(mockLogger());
const taskManagerId = uuid.v1();
const fieldUpdates = {
ownerId: taskManagerId,
retryAt: new Date(Date.now()),
};
definitions.registerTaskDefinitions({
foo: {
title: 'foo',
createTaskRunner: jest.fn(),
},
bar: {
title: 'bar',
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
});
const [
{
args: {
updateByQuery: [{ query, script, sort }],
},
},
] = await testClaimAvailableTasks({
storeOpts: {
taskManagerId,
definitions,
},
taskClaimingOpts: {
maxAttempts,
},
claimingOpts: {
claimOwnershipUntil: new Date(),
claimTasksById: [
'33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
],
},
});
expect(query).toMatchObject({
bool: {
must: [
{
pinned: {
ids: [
'task:33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'task:a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
],
organic: {
bool: {
must: [
{
bool: {
must: [
{
term: {
'task.enabled': true,
},
},
],
},
},
{
bool: {
should: [
{
bool: {
must: [
{ term: { 'task.status': 'idle' } },
{ range: { 'task.runAt': { lte: 'now' } } },
],
},
},
{
bool: {
must: [
{
bool: {
should: [
{ term: { 'task.status': 'running' } },
{ term: { 'task.status': 'claiming' } },
],
},
},
{ range: { 'task.retryAt': { lte: 'now' } } },
],
},
},
],
},
},
],
},
},
},
},
],
filter: [
{
bool: {
must_not: [
{
bool: {
should: [
{ term: { 'task.status': 'running' } },
{ term: { 'task.status': 'claiming' } },
],
must: { range: { 'task.retryAt': { gt: 'now' } } },
},
},
],
},
},
],
},
});
expect(script).toMatchObject({
source: expect.any(String),
lang: 'painless',
params: {
fieldUpdates,
claimTasksById: [
'task:33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'task:a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
],
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: {
bar: customMaxAttempts,
foo: maxAttempts,
},
},
});
expect(sort).toMatchObject([
'_score',
{
_script: {
type: 'number',
order: 'asc',
script: {
lang: 'painless',
source: `
if (doc['task.retryAt'].size()!=0) {
return doc['task.retryAt'].value.toInstant().toEpochMilli();
}
if (doc['task.runAt'].size()!=0) {
return doc['task.runAt'].value.toInstant().toEpochMilli();
}
`,
},
},
},
]);
});
test('it should claim in batches partitioned by maxConcurrency', async () => {
const maxAttempts = _.random(2, 43);
const definitions = new TaskTypeDictionary(mockLogger());
@ -618,10 +455,6 @@ if (doc['task.runAt'].size()!=0) {
},
claimingOpts: {
claimOwnershipUntil: new Date(),
claimTasksById: [
'33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
],
},
});
@ -633,10 +466,6 @@ if (doc['task.runAt'].size()!=0) {
lang: 'painless',
params: {
fieldUpdates,
claimTasksById: [
'task:33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'task:a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
],
claimableTaskTypes: ['unlimited', 'anotherUnlimited', 'finalUnlimited'],
skippedTaskTypes: [
'limitedToZero',
@ -657,7 +486,6 @@ if (doc['task.runAt'].size()!=0) {
lang: 'painless',
params: {
fieldUpdates,
claimTasksById: [],
claimableTaskTypes: ['limitedToOne'],
skippedTaskTypes: [
'unlimited',
@ -679,7 +507,6 @@ if (doc['task.runAt'].size()!=0) {
lang: 'painless',
params: {
fieldUpdates,
claimTasksById: [],
claimableTaskTypes: ['anotherLimitedToOne'],
skippedTaskTypes: [
'unlimited',
@ -701,7 +528,6 @@ if (doc['task.runAt'].size()!=0) {
lang: 'painless',
params: {
fieldUpdates,
claimTasksById: [],
claimableTaskTypes: ['limitedToTwo'],
skippedTaskTypes: [
'unlimited',
@ -799,7 +625,6 @@ if (doc['task.runAt'].size()!=0) {
],
claimingOpts: {
claimOwnershipUntil: new Date(),
claimTasksById: [],
},
});
@ -1016,7 +841,6 @@ if (doc['task.runAt'].size()!=0) {
lang: 'painless',
params: {
fieldUpdates,
claimTasksById: [],
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: ['foobar'],
unusedTaskTypes: ['barfoo'],
@ -1429,188 +1253,13 @@ if (doc['task.runAt'].size()!=0) {
return { taskManagerId, runAt, taskClaiming };
}
test('emits an event when a task is succesfully claimed by id', async () => {
const { taskManagerId, runAt, taskClaiming } = instantiateStoreWithMockedApiResponses();
const promise = taskClaiming.events
.pipe(
filter(
(event: TaskEvent<ConcreteTaskInstance, ClaimTaskErr>) => event.id === 'claimed-by-id'
),
take(1)
)
.toPromise();
await getFirstAsPromise(
taskClaiming.claimAvailableTasks({
claimTasksById: ['claimed-by-id'],
claimOwnershipUntil: new Date(),
})
);
const event = await promise;
expect(event).toMatchObject(
asTaskClaimEvent(
'claimed-by-id',
asOk({
id: 'claimed-by-id',
runAt,
taskType: 'foo',
schedule: undefined,
attempts: 0,
status: 'claiming' as TaskStatus,
params: { hello: 'world' },
state: { baby: 'Henhen' },
user: 'jimbo',
scope: ['reporting'],
ownerId: taskManagerId,
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
traceparent: 'parent',
})
)
);
});
test('emits an event when a task is succesfully claimed by id by is rejected as it would exceed maxCapacity of its taskType', async () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
unlimited: {
title: 'unlimited',
createTaskRunner: jest.fn(),
},
limitedToOne: {
title: 'limitedToOne',
maxConcurrency: 1,
createTaskRunner: jest.fn(),
},
anotherLimitedToOne: {
title: 'anotherLimitedToOne',
maxConcurrency: 1,
createTaskRunner: jest.fn(),
},
});
const taskManagerId = uuid.v4();
const { runAt, taskClaiming } = instantiateStoreWithMockedApiResponses({
taskManagerId,
definitions,
getCapacity: (type) => {
switch (type) {
case 'limitedToOne':
// return 0 as there's already a `limitedToOne` task running
return 0;
default:
return 10;
}
},
tasksClaimed: [
// find on first claim cycle
[
{
id: 'claimed-by-id-limited-concurrency',
runAt: new Date(),
taskType: 'limitedToOne',
schedule: undefined,
attempts: 0,
status: TaskStatus.Claiming,
params: { hello: 'world' },
state: { baby: 'Henhen' },
user: 'jimbo',
scope: ['reporting'],
ownerId: taskManagerId,
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
traceparent: '',
},
],
// second cycle
[
{
id: 'claimed-by-schedule-unlimited',
runAt: new Date(),
taskType: 'unlimited',
schedule: undefined,
attempts: 0,
status: TaskStatus.Claiming,
params: { hello: 'world' },
state: { baby: 'Henhen' },
user: 'jimbo',
scope: ['reporting'],
ownerId: taskManagerId,
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
traceparent: '',
},
],
],
});
const promise = taskClaiming.events
.pipe(
filter(
(event: TaskEvent<ConcreteTaskInstance, ClaimTaskErr>) =>
event.id === 'claimed-by-id-limited-concurrency'
),
take(1)
)
.toPromise();
const [firstCycleResult, secondCycleResult] = await getAllAsPromise(
taskClaiming.claimAvailableTasks({
claimTasksById: ['claimed-by-id-limited-concurrency'],
claimOwnershipUntil: new Date(),
})
);
expect(firstCycleResult.stats.tasksClaimed).toEqual(0);
expect(firstCycleResult.stats.tasksRejected).toEqual(1);
expect(firstCycleResult.stats.tasksUpdated).toEqual(1);
// values accumulate from cycle to cycle
expect(secondCycleResult.stats.tasksClaimed).toEqual(0);
expect(secondCycleResult.stats.tasksRejected).toEqual(1);
expect(secondCycleResult.stats.tasksUpdated).toEqual(1);
const event = await promise;
expect(event).toMatchObject(
asTaskClaimEvent(
'claimed-by-id-limited-concurrency',
asErr({
task: some({
id: 'claimed-by-id-limited-concurrency',
runAt,
taskType: 'limitedToOne',
schedule: undefined,
attempts: 0,
status: 'claiming' as TaskStatus,
params: { hello: 'world' },
state: { baby: 'Henhen' },
user: 'jimbo',
scope: ['reporting'],
ownerId: taskManagerId,
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
traceparent: '',
}),
errorType: TaskClaimErrorType.CLAIMED_BY_ID_OUT_OF_CAPACITY,
})
)
);
});
test('emits an event when a task is succesfully by scheduling', async () => {
const { taskManagerId, runAt, taskClaiming } = instantiateStoreWithMockedApiResponses();
const promise = taskClaiming.events
.pipe(
filter(
(event: TaskEvent<ConcreteTaskInstance, ClaimTaskErr>) =>
event.id === 'claimed-by-schedule'
(event: TaskEvent<ConcreteTaskInstance, Error>) => event.id === 'claimed-by-schedule'
),
take(1)
)
@ -1618,7 +1267,6 @@ if (doc['task.runAt'].size()!=0) {
await getFirstAsPromise(
taskClaiming.claimAvailableTasks({
claimTasksById: ['claimed-by-id'],
claimOwnershipUntil: new Date(),
})
);
@ -1647,84 +1295,6 @@ if (doc['task.runAt'].size()!=0) {
)
);
});
test('emits an event when the store fails to claim a required task by id', async () => {
const { taskManagerId, runAt, taskClaiming } = instantiateStoreWithMockedApiResponses();
const promise = taskClaiming.events
.pipe(
filter(
(event: TaskEvent<ConcreteTaskInstance, ClaimTaskErr>) => event.id === 'already-running'
),
take(1)
)
.toPromise();
await getFirstAsPromise(
taskClaiming.claimAvailableTasks({
claimTasksById: ['already-running'],
claimOwnershipUntil: new Date(),
})
);
const event = await promise;
expect(event).toMatchObject(
asTaskClaimEvent(
'already-running',
asErr({
task: some({
id: 'already-running',
runAt,
taskType: 'bar',
schedule: { interval: '5m' },
attempts: 2,
status: 'running' as TaskStatus,
params: { shazm: 1 },
state: { henry: 'The 8th' },
user: 'dabo',
scope: ['reporting', 'ceo'],
ownerId: taskManagerId,
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
traceparent: '',
}),
errorType: TaskClaimErrorType.CLAIMED_BY_ID_NOT_IN_CLAIMING_STATUS,
})
)
);
});
test('emits an event when the store fails to find a task which was required by id', async () => {
const { taskClaiming } = instantiateStoreWithMockedApiResponses();
const promise = taskClaiming.events
.pipe(
filter(
(event: TaskEvent<ConcreteTaskInstance, ClaimTaskErr>) => event.id === 'unknown-task'
),
take(1)
)
.toPromise();
await getFirstAsPromise(
taskClaiming.claimAvailableTasks({
claimTasksById: ['unknown-task'],
claimOwnershipUntil: new Date(),
})
);
const event = await promise;
expect(event).toMatchObject(
asTaskClaimEvent(
'unknown-task',
asErr({
task: none,
errorType: TaskClaimErrorType.CLAIMED_BY_ID_NOT_RETURNED,
})
)
);
});
});
});

View file

@ -12,28 +12,14 @@ import apm from 'elastic-apm-node';
import minimatch from 'minimatch';
import { Subject, Observable, from, of } from 'rxjs';
import { map, mergeScan } from 'rxjs/operators';
import { difference, partition, groupBy, mapValues, countBy, pick, isPlainObject } from 'lodash';
import { some, none } from 'fp-ts/lib/Option';
import { groupBy, pick, isPlainObject } from 'lodash';
import { Logger } from '@kbn/core/server';
import { asOk, asErr, Result } from '../lib/result_type';
import { ConcreteTaskInstance, TaskStatus } from '../task';
import {
TaskClaim,
asTaskClaimEvent,
TaskClaimErrorType,
startTaskTimer,
TaskTiming,
} from '../task_events';
import {
shouldBeOneOf,
mustBeAllOf,
filterDownBy,
asPinnedQuery,
matchesClauses,
} from './query_clauses';
import { ConcreteTaskInstance } from '../task';
import { TaskClaim, asTaskClaimEvent, startTaskTimer, TaskTiming } from '../task_events';
import { shouldBeOneOf, mustBeAllOf, filterDownBy, matchesClauses } from './query_clauses';
import {
updateFieldsAndMarkAsFailed,
@ -67,7 +53,6 @@ export interface TaskClaimingOpts {
export interface OwnershipClaimingOpts {
claimOwnershipUntil: Date;
claimTasksById?: string[];
size: number;
taskTypes: Set<string>;
}
@ -87,7 +72,6 @@ export interface ClaimOwnershipResult {
tasksUpdated: number;
tasksConflicted: number;
tasksClaimed: number;
tasksRejected: number;
};
docs: ConcreteTaskInstance[];
timing?: TaskTiming;
@ -213,7 +197,6 @@ export class TaskClaiming {
public claimAvailableTasks({
claimOwnershipUntil,
claimTasksById = [],
}: Omit<OwnershipClaimingOpts, 'size' | 'taskTypes'>): Observable<ClaimOwnershipResult> {
const initialCapacity = this.getCapacity();
return from(this.getClaimingBatches()).pipe(
@ -231,7 +214,6 @@ export class TaskClaiming {
return from(
this.executeClaimAvailableTasks({
claimOwnershipUntil,
claimTasksById: claimTasksById.splice(0, capacity),
size: capacity,
taskTypes: isLimited(batch) ? new Set([batch.tasksTypes]) : batch.tasksTypes,
}).then((result) => {
@ -255,109 +237,29 @@ export class TaskClaiming {
private executeClaimAvailableTasks = async ({
claimOwnershipUntil,
claimTasksById = [],
size,
taskTypes,
}: OwnershipClaimingOpts): Promise<ClaimOwnershipResult> => {
const claimTasksByIdWithRawIds = this.taskStore.convertToSavedObjectIds(claimTasksById);
const { updated: tasksUpdated, version_conflicts: tasksConflicted } =
await this.markAvailableTasksAsClaimed({
claimOwnershipUntil,
claimTasksById: claimTasksByIdWithRawIds,
size,
taskTypes,
});
const docs =
tasksUpdated > 0
? await this.sweepForClaimedTasks(claimTasksByIdWithRawIds, taskTypes, size)
: [];
const docs = tasksUpdated > 0 ? await this.sweepForClaimedTasks(taskTypes, size) : [];
const [documentsReturnedById, documentsClaimedBySchedule] = partition(docs, (doc) =>
claimTasksById.includes(doc.id)
);
const [documentsClaimedById, documentsRequestedButNotClaimed] = partition(
documentsReturnedById,
// we filter the schduled tasks down by status is 'claiming' in the esearch,
// but we do not apply this limitation on tasks claimed by ID so that we can
// provide more detailed error messages when we fail to claim them
(doc) => doc.status === TaskStatus.Claiming
);
// count how many tasks we've claimed by ID and validate we have capacity for them to run
const remainingCapacityOfClaimByIdByType = mapValues(
// This means we take the tasks that were claimed by their ID and count them by their type
countBy(documentsClaimedById, (doc) => doc.taskType),
(count, type) => this.getCapacity(type) - count
);
const [documentsClaimedByIdWithinCapacity, documentsClaimedByIdOutOfCapacity] = partition(
documentsClaimedById,
(doc) => {
// if we've exceeded capacity, we reject this task
if (remainingCapacityOfClaimByIdByType[doc.taskType] < 0) {
// as we're rejecting this task we can inc the count so that we know
// to keep the next one returned by ID of the same type
remainingCapacityOfClaimByIdByType[doc.taskType]++;
return false;
}
return true;
}
);
const documentsRequestedButNotReturned = difference(
claimTasksById,
documentsReturnedById.map((doc) => doc.id)
);
this.emitEvents([
...documentsClaimedByIdWithinCapacity.map((doc) => asTaskClaimEvent(doc.id, asOk(doc))),
...documentsClaimedByIdOutOfCapacity.map((doc) =>
asTaskClaimEvent(
doc.id,
asErr({
task: some(doc),
errorType: TaskClaimErrorType.CLAIMED_BY_ID_OUT_OF_CAPACITY,
})
)
),
...documentsClaimedBySchedule.map((doc) => asTaskClaimEvent(doc.id, asOk(doc))),
...documentsRequestedButNotClaimed.map((doc) =>
asTaskClaimEvent(
doc.id,
asErr({
task: some(doc),
errorType: TaskClaimErrorType.CLAIMED_BY_ID_NOT_IN_CLAIMING_STATUS,
})
)
),
...documentsRequestedButNotReturned.map((id) =>
asTaskClaimEvent(
id,
asErr({ task: none, errorType: TaskClaimErrorType.CLAIMED_BY_ID_NOT_RETURNED })
)
),
]);
this.emitEvents(docs.map((doc) => asTaskClaimEvent(doc.id, asOk(doc))));
const stats = {
tasksUpdated,
tasksConflicted,
tasksRejected: documentsClaimedByIdOutOfCapacity.length,
tasksClaimed: documentsClaimedByIdWithinCapacity.length + documentsClaimedBySchedule.length,
tasksClaimed: docs.length,
};
if (docs.length !== stats.tasksClaimed + stats.tasksRejected) {
this.logger.warn(
`[Task Ownership error]: ${stats.tasksClaimed} tasks were claimed by Kibana, but ${
docs.length
} task(s) were fetched (${docs.map((doc) => doc.id).join(', ')})`
);
}
return {
stats,
docs: [...documentsClaimedByIdWithinCapacity, ...documentsClaimedBySchedule],
docs,
};
};
@ -373,7 +275,6 @@ export class TaskClaiming {
private async markAvailableTasksAsClaimed({
claimOwnershipUntil,
claimTasksById,
size,
taskTypes,
}: OwnershipClaimingOpts): Promise<UpdateByQueryResult> {
@ -392,29 +293,13 @@ export class TaskClaiming {
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt)
);
// The documents should be sorted by runAt/retryAt, unless there are pinned
// tasks being queried, in which case we want to sort by score first, and then
// the runAt/retryAt. That way we'll get the pinned tasks first. Note that
// the score seems to favor newer documents rather than older documents, so
// if there are not pinned tasks being queried, we do NOT want to sort by score
// at all, just by runAt/retryAt.
const sort: NonNullable<SearchOpts['sort']> = [SortByRunAtAndRetryAt];
if (claimTasksById && claimTasksById.length) {
sort.unshift('_score');
}
const query = matchesClauses(
claimTasksById && claimTasksById.length
? mustBeAllOf(asPinnedQuery(claimTasksById, queryForScheduledTasks))
: queryForScheduledTasks,
filterDownBy(InactiveTasks)
);
const query = matchesClauses(queryForScheduledTasks, filterDownBy(InactiveTasks));
const script = updateFieldsAndMarkAsFailed({
fieldUpdates: {
ownerId: this.taskStore.taskManagerId,
retryAt: claimOwnershipUntil,
},
claimTasksById: claimTasksById || [],
claimableTaskTypes: taskTypesToClaim,
skippedTaskTypes: taskTypesToSkip,
unusedTaskTypes: this.unusedTypes,
@ -449,7 +334,6 @@ export class TaskClaiming {
* Fetches tasks from the index, which are owned by the current Kibana instance
*/
private async sweepForClaimedTasks(
claimTasksById: OwnershipClaimingOpts['claimTasksById'],
taskTypes: Set<string>,
size: number
): Promise<ConcreteTaskInstance[]> {
@ -458,10 +342,7 @@ export class TaskClaiming {
tasksOfType([...taskTypes])
);
const { docs } = await this.taskStore.fetch({
query:
claimTasksById && claimTasksById.length
? asPinnedQuery(claimTasksById, claimedTasksQuery)
: claimedTasksQuery,
query: claimedTasksQuery,
size,
sort: SortByRunAtAndRetryAt,
seq_no_primary_term: true,
@ -494,7 +375,6 @@ function accumulateClaimOwnershipResults(
tasksUpdated: stats.tasksUpdated + prev.stats.tasksUpdated,
tasksConflicted: stats.tasksConflicted + prev.stats.tasksConflicted,
tasksClaimed: stats.tasksClaimed + prev.stats.tasksClaimed,
tasksRejected: stats.tasksRejected + prev.stats.tasksRejected,
},
docs,
timing,

View file

@ -7,8 +7,6 @@
import { monitorEventLoopDelay } from 'perf_hooks';
import { Option } from 'fp-ts/lib/Option';
import { ConcreteTaskInstance } from './task';
import { Result, Err } from './lib/result_type';
@ -34,12 +32,6 @@ export enum TaskEventType {
EPHEMERAL_TASK_DELAYED_DUE_TO_CAPACITY = 'EPHEMERAL_TASK_DELAYED_DUE_TO_CAPACITY',
}
export enum TaskClaimErrorType {
CLAIMED_BY_ID_OUT_OF_CAPACITY = 'CLAIMED_BY_ID_OUT_OF_CAPACITY',
CLAIMED_BY_ID_NOT_RETURNED = 'CLAIMED_BY_ID_NOT_RETURNED',
CLAIMED_BY_ID_NOT_IN_CLAIMING_STATUS = 'CLAIMED_BY_ID_NOT_IN_CLAIMING_STATUS',
}
export interface TaskTiming {
start: number;
stop: number;
@ -82,14 +74,10 @@ export interface RanTask {
export type ErroredTask = RanTask & {
error: Error;
};
export interface ClaimTaskErr {
task: Option<ConcreteTaskInstance>;
errorType: TaskClaimErrorType;
}
export type TaskMarkRunning = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskRun = TaskEvent<RanTask, ErroredTask>;
export type TaskClaim = TaskEvent<ConcreteTaskInstance, ClaimTaskErr>;
export type TaskClaim = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskRunRequest = TaskEvent<ConcreteTaskInstance, Error>;
export type EphemeralTaskRejectedDueToCapacity = TaskEvent<EphemeralTaskInstanceRequest, Error>;
export type TaskPollingCycle<T = string> = TaskEvent<ClaimAndFillPoolResult, PollingError<T>>;
@ -137,7 +125,7 @@ export function asTaskRunEvent(
export function asTaskClaimEvent(
id: string,
event: Result<ConcreteTaskInstance, ClaimTaskErr>,
event: Result<ConcreteTaskInstance, Error>,
timing?: TaskTiming
): TaskClaim {
return {

View file

@ -7,8 +7,6 @@
import { filter, take } from 'rxjs/operators';
import pMap from 'p-map';
import { pipe } from 'fp-ts/lib/pipeable';
import { getOrElse, isSome, map as mapOptional, Option } from 'fp-ts/lib/Option';
import uuid from 'uuid';
import { chunk, pick } from 'lodash';
@ -17,9 +15,8 @@ import agent from 'elastic-apm-node';
import { Logger } from '@kbn/core/server';
import { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { mustBeAllOf } from './queries/query_clauses';
import { asOk, either, isErr, map, mapErr, promiseResult } from './lib/result_type';
import { either, isErr, mapErr } from './lib/result_type';
import {
ClaimTaskErr,
ErroredTask,
ErrResultOf,
isTaskClaimEvent,
@ -27,7 +24,6 @@ import {
isTaskRunRequestEvent,
OkResultOf,
RanTask,
TaskClaimErrorType,
} from './task_events';
import { Middleware } from './lib/middleware';
import { parseIntervalAsMillisecond } from './lib/intervals';
@ -37,14 +33,11 @@ import {
IntervalSchedule,
TaskInstanceWithDeprecatedFields,
TaskInstanceWithId,
TaskLifecycle,
TaskLifecycleResult,
TaskStatus,
} from './task';
import { TaskStore } from './task_store';
import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields';
import { TaskLifecycleEvent } from './polling_lifecycle';
import { TaskTypeDictionary } from './task_type_dictionary';
import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle';
import { EphemeralTaskRejectedDueToCapacityError } from './task_running';
@ -55,7 +48,6 @@ export interface TaskSchedulingOpts {
taskStore: TaskStore;
ephemeralTaskLifecycle?: EphemeralTaskLifecycle;
middleware: Middleware;
definitions: TaskTypeDictionary;
taskManagerId: string;
}
@ -87,7 +79,6 @@ export class TaskScheduling {
private ephemeralTaskLifecycle?: EphemeralTaskLifecycle;
private logger: Logger;
private middleware: Middleware;
private definitions: TaskTypeDictionary;
private taskManagerId: string;
/**
@ -100,7 +91,6 @@ export class TaskScheduling {
this.middleware = opts.middleware;
this.ephemeralTaskLifecycle = opts.ephemeralTaskLifecycle;
this.store = opts.taskStore;
this.definitions = opts.definitions;
this.taskManagerId = opts.taskManagerId;
}
@ -412,27 +402,9 @@ export class TaskScheduling {
filter(({ id }: TaskLifecycleEvent) => id === taskId)
).subscribe((taskEvent: TaskLifecycleEvent) => {
if (isTaskClaimEvent(taskEvent)) {
mapErr(async (error: ClaimTaskErr) => {
mapErr(async (error: Error) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
if (
isSome(error.task) &&
error.errorType === TaskClaimErrorType.CLAIMED_BY_ID_OUT_OF_CAPACITY
) {
const task = error.task.value;
const definition = this.definitions.get(task.taskType);
return reject(
new Error(
`Failed to run task "${taskId}" as we would exceed the max concurrency of "${
definition?.title ?? task.taskType
}" which is ${
definition?.maxConcurrency
}. Rescheduled the task to ensure it is picked up as soon as possible.`
)
);
} else {
return reject(await this.identifyTaskFailureReason(taskId, error.task));
}
}, taskEvent.event);
} else {
either<OkResultOf<TaskLifecycleEvent>, ErrResultOf<TaskLifecycleEvent>>(
@ -471,37 +443,6 @@ export class TaskScheduling {
});
}
private async identifyTaskFailureReason(taskId: string, error: Option<ConcreteTaskInstance>) {
return map(
await pipe(
error,
mapOptional(async (taskReturnedBySweep) => asOk(taskReturnedBySweep.status)),
getOrElse(() =>
// if the error happened in the Claim phase - we try to provide better insight
// into why we failed to claim by getting the task's current lifecycle status
promiseResult<TaskLifecycle, Error>(this.store.getLifecycle(taskId))
)
),
(taskLifecycleStatus: TaskLifecycle) => {
if (taskLifecycleStatus === TaskLifecycleResult.NotFound) {
return new Error(`Failed to run task "${taskId}" as it does not exist`);
} else if (
taskLifecycleStatus === TaskStatus.Running ||
taskLifecycleStatus === TaskStatus.Claiming
) {
return new Error(`Failed to run task "${taskId}" as it is currently running`);
}
return new Error(
`Failed to run task "${taskId}" for unknown reason (Current Task Lifecycle is "${taskLifecycleStatus}")`
);
},
(getLifecycleError: Error) =>
new Error(
`Failed to run task "${taskId}" and failed to get current Status:${getLifecycleError}`
)
);
}
private async getNonRunningTask(taskId: string) {
const task = await this.store.get(taskId);
switch (task.status) {