[Response Ops][Task Manager] Adding background task to mark removed task types as unrecognized (#199057)

Resolves https://github.com/elastic/kibana/issues/192686

## Summary

Creates a background task to search for removed task types and mark them
as unrecognized. Removes the current logic that does this during the
task claim cycle for both task claim strategies.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2024-11-11 15:17:46 -05:00 committed by GitHub
parent 482e3f4223
commit be949d66e4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 547 additions and 556 deletions

View file

@ -29,7 +29,7 @@ import { TaskManagerConfig } from './config';
import { createInitialMiddleware, addMiddlewareToChain, Middleware } from './lib/middleware';
import { removeIfExists } from './lib/remove_if_exists';
import { setupSavedObjects, BACKGROUND_TASK_NODE_SO_NAME, TASK_SO_NAME } from './saved_objects';
import { TaskDefinitionRegistry, TaskTypeDictionary, REMOVED_TYPES } from './task_type_dictionary';
import { TaskDefinitionRegistry, TaskTypeDictionary } from './task_type_dictionary';
import { AggregationOpts, FetchResult, SearchOpts, TaskStore } from './task_store';
import { createManagedConfiguration } from './lib/create_managed_configuration';
import { TaskScheduling } from './task_scheduling';
@ -45,6 +45,10 @@ import { metricsStream, Metrics } from './metrics';
import { TaskManagerMetricsCollector } from './metrics/task_metrics_collector';
import { TaskPartitioner } from './lib/task_partitioner';
import { getDefaultCapacity } from './lib/get_default_capacity';
import {
registerMarkRemovedTasksAsUnrecognizedDefinition,
scheduleMarkRemovedTasksAsUnrecognizedDefinition,
} from './removed_tasks/mark_removed_tasks_as_unrecognized';
export interface TaskManagerSetupContract {
/**
@ -221,6 +225,11 @@ export class TaskManagerPlugin
}
registerDeleteInactiveNodesTaskDefinition(this.logger, core.getStartServices, this.definitions);
registerMarkRemovedTasksAsUnrecognizedDefinition(
this.logger,
core.getStartServices,
this.definitions
);
if (this.config.unsafe.exclude_task_types.length) {
this.logger.warn(
@ -332,7 +341,6 @@ export class TaskManagerPlugin
this.taskPollingLifecycle = new TaskPollingLifecycle({
config: this.config!,
definitions: this.definitions,
unusedTypes: REMOVED_TYPES,
logger: this.logger,
executionContext,
taskStore,
@ -384,6 +392,7 @@ export class TaskManagerPlugin
});
scheduleDeleteInactiveNodesTaskDefinition(this.logger, taskScheduling).catch(() => {});
scheduleMarkRemovedTasksAsUnrecognizedDefinition(this.logger, taskScheduling).catch(() => {});
return {
fetch: (opts: SearchOpts): Promise<FetchResult> => taskStore.fetch(opts),

View file

@ -106,7 +106,6 @@ describe('TaskPollingLifecycle', () => {
},
taskStore: mockTaskStore,
logger: taskManagerLogger,
unusedTypes: [],
definitions: new TaskTypeDictionary(taskManagerLogger),
middleware: createInitialMiddleware(),
startingCapacity: 20,

View file

@ -55,7 +55,6 @@ export interface ITaskEventEmitter<T> {
export type TaskPollingLifecycleOpts = {
logger: Logger;
definitions: TaskTypeDictionary;
unusedTypes: string[];
taskStore: TaskStore;
config: TaskManagerConfig;
middleware: Middleware;
@ -115,7 +114,6 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
config,
taskStore,
definitions,
unusedTypes,
executionContext,
usageCounter,
taskPartitioner,
@ -153,7 +151,6 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
maxAttempts: config.max_attempts,
excludedTaskTypes: config.unsafe.exclude_task_types,
definitions,
unusedTypes,
logger: this.logger,
getAvailableCapacity: (taskType?: string) => this.pool.availableCapacity(taskType),
taskPartitioner,

View file

@ -70,7 +70,6 @@ describe('mark_available_tasks_as_claimed', () => {
fieldUpdates,
claimableTaskTypes: definitions.getAllTypes(),
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: Array.from(definitions).reduce((accumulator, [type, { maxAttempts }]) => {
return { ...accumulator, [type]: maxAttempts || defaultMaxAttempts };
}, {}),
@ -153,8 +152,6 @@ if (doc['task.runAt'].size()!=0) {
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
ctx.op = "noop";
}`,
@ -167,7 +164,6 @@ if (doc['task.runAt'].size()!=0) {
},
claimableTaskTypes: ['sampleTask', 'otherTask'],
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: {
sampleTask: 5,
otherTask: 1,
@ -242,7 +238,6 @@ if (doc['task.runAt'].size()!=0) {
fieldUpdates,
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: {
foo: 5,
bar: 2,

View file

@ -202,7 +202,6 @@ export interface UpdateFieldsAndMarkAsFailedOpts {
};
claimableTaskTypes: string[];
skippedTaskTypes: string[];
unusedTaskTypes: string[];
taskMaxAttempts: { [field: string]: number };
}
@ -210,7 +209,6 @@ export const updateFieldsAndMarkAsFailed = ({
fieldUpdates,
claimableTaskTypes,
skippedTaskTypes,
unusedTaskTypes,
taskMaxAttempts,
}: UpdateFieldsAndMarkAsFailedOpts): ScriptClause => {
const setScheduledAtScript = `if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
@ -227,8 +225,6 @@ export const updateFieldsAndMarkAsFailed = ({
source: `
if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) {
${setScheduledAtAndMarkAsClaimed}
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
ctx.op = "noop";
}`,
@ -238,7 +234,6 @@ export const updateFieldsAndMarkAsFailed = ({
fieldUpdates,
claimableTaskTypes,
skippedTaskTypes,
unusedTaskTypes,
taskMaxAttempts,
},
};

View file

@ -83,7 +83,6 @@ describe('TaskClaiming', () => {
strategy: 'non-default',
definitions,
excludedTaskTypes: [],
unusedTypes: [],
taskStore: taskStoreMock.create({ taskManagerId: '' }),
maxAttempts: 2,
getAvailableCapacity: () => 10,
@ -134,7 +133,6 @@ describe('TaskClaiming', () => {
strategy: 'default',
definitions,
excludedTaskTypes: [],
unusedTypes: [],
taskStore: taskStoreMock.create({ taskManagerId: '' }),
maxAttempts: 2,
getAvailableCapacity: () => 10,

View file

@ -34,7 +34,6 @@ export interface TaskClaimingOpts {
logger: Logger;
strategy: string;
definitions: TaskTypeDictionary;
unusedTypes: string[];
taskStore: TaskStore;
maxAttempts: number;
excludedTaskTypes: string[];
@ -92,7 +91,6 @@ export class TaskClaiming {
private readonly taskClaimingBatchesByType: TaskClaimingBatches;
private readonly taskMaxAttempts: Record<string, number>;
private readonly excludedTaskTypes: string[];
private readonly unusedTypes: string[];
private readonly taskClaimer: TaskClaimerFn;
private readonly taskPartitioner: TaskPartitioner;
@ -111,7 +109,6 @@ export class TaskClaiming {
this.taskClaimingBatchesByType = this.partitionIntoClaimingBatches(this.definitions);
this.taskMaxAttempts = Object.fromEntries(this.normalizeMaxAttempts(this.definitions));
this.excludedTaskTypes = opts.excludedTaskTypes;
this.unusedTypes = opts.unusedTypes;
this.taskClaimer = getTaskClaimer(this.logger, opts.strategy);
this.events$ = new Subject<TaskClaim>();
this.taskPartitioner = opts.taskPartitioner;
@ -178,7 +175,6 @@ export class TaskClaiming {
taskStore: this.taskStore,
events$: this.events$,
getCapacity: this.getAvailableCapacity,
unusedTypes: this.unusedTypes,
definitions: this.definitions,
taskMaxAttempts: this.taskMaxAttempts,
excludedTaskTypes: this.excludedTaskTypes,

View file

@ -0,0 +1,266 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { mockLogger } from '../test_utils';
import { coreMock, elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { SCHEDULE_INTERVAL, taskRunner } from './mark_removed_tasks_as_unrecognized';
import { SearchHit } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
const createTaskDoc = (id: string = '1'): SearchHit<unknown> => ({
_index: '.kibana_task_manager_9.0.0_001',
_id: `task:${id}`,
_score: 1,
_source: {
references: [],
type: 'task',
updated_at: '2024-11-06T14:17:55.935Z',
task: {
taskType: 'report',
params: '{}',
state: '{"foo":"test"}',
stateVersion: 1,
runAt: '2024-11-06T14:17:55.935Z',
enabled: true,
scheduledAt: '2024-11-06T14:17:55.935Z',
attempts: 0,
status: 'idle',
startedAt: null,
retryAt: null,
ownerId: null,
partition: 211,
},
},
});
describe('markRemovedTasksAsUnrecognizedTask', () => {
const logger = mockLogger();
const coreSetup = coreMock.createSetup();
const esClient = elasticsearchServiceMock.createStart();
afterEach(() => {
jest.clearAllMocks();
});
it('marks removed tasks as unrecognized', async () => {
esClient.client.asInternalUser.bulk.mockResolvedValue({
errors: false,
took: 0,
items: [
{
update: {
_index: '.kibana_task_manager_9.0.0_001',
_id: 'task:123',
_version: 2,
result: 'updated',
_shards: { total: 1, successful: 1, failed: 0 },
_seq_no: 84,
_primary_term: 1,
status: 200,
},
},
{
update: {
_index: '.kibana_task_manager_9.0.0_001',
_id: 'task:456',
_version: 2,
result: 'updated',
_shards: { total: 1, successful: 1, failed: 0 },
_seq_no: 84,
_primary_term: 1,
status: 200,
},
},
{
update: {
_index: '.kibana_task_manager_9.0.0_001',
_id: 'task:789',
_version: 2,
result: 'updated',
_shards: { total: 1, successful: 1, failed: 0 },
_seq_no: 84,
_primary_term: 1,
status: 200,
},
},
],
});
coreSetup.getStartServices.mockResolvedValue([
{
...coreMock.createStart(),
elasticsearch: esClient,
},
{},
coreMock.createSetup(),
]);
// @ts-expect-error
esClient.client.asInternalUser.search.mockResponse({
hits: { hits: [createTaskDoc('123'), createTaskDoc('456'), createTaskDoc('789')], total: 3 },
});
const runner = taskRunner(logger, coreSetup.getStartServices)();
const result = await runner.run();
expect(esClient.client.asInternalUser.bulk).toHaveBeenCalledWith({
body: [
{ update: { _id: 'task:123' } },
{ doc: { task: { status: 'unrecognized' } } },
{ update: { _id: 'task:456' } },
{ doc: { task: { status: 'unrecognized' } } },
{ update: { _id: 'task:789' } },
{ doc: { task: { status: 'unrecognized' } } },
],
index: '.kibana_task_manager',
refresh: false,
});
expect(logger.debug).toHaveBeenCalledWith(`Marked 3 removed tasks as unrecognized`);
expect(result).toEqual({
state: {},
schedule: { interval: SCHEDULE_INTERVAL },
});
});
it('skips update when there are no removed task types', async () => {
coreSetup.getStartServices.mockResolvedValue([
{
...coreMock.createStart(),
elasticsearch: esClient,
},
{},
coreMock.createSetup(),
]);
// @ts-expect-error
esClient.client.asInternalUser.search.mockResponse({
hits: { hits: [], total: 0 },
});
const runner = taskRunner(logger, coreSetup.getStartServices)();
const result = await runner.run();
expect(esClient.client.asInternalUser.bulk).not.toHaveBeenCalled();
expect(result).toEqual({
state: {},
schedule: { interval: SCHEDULE_INTERVAL },
});
});
it('schedules the next run even when there is an error', async () => {
coreSetup.getStartServices.mockResolvedValue([
{
...coreMock.createStart(),
elasticsearch: esClient,
},
{},
coreMock.createSetup(),
]);
esClient.client.asInternalUser.search.mockRejectedValueOnce(new Error('foo'));
const runner = taskRunner(logger, coreSetup.getStartServices)();
const result = await runner.run();
expect(esClient.client.asInternalUser.bulk).not.toHaveBeenCalled();
expect(logger.error).toHaveBeenCalledWith(
'Failed to mark removed tasks as unrecognized. Error: foo'
);
expect(result).toEqual({
state: {},
schedule: { interval: SCHEDULE_INTERVAL },
});
});
it('handles partial errors from bulk partial update', async () => {
esClient.client.asInternalUser.bulk.mockResolvedValue({
errors: false,
took: 0,
items: [
{
update: {
_index: '.kibana_task_manager_9.0.0_001',
_id: 'task:123',
_version: 2,
result: 'updated',
_shards: { total: 1, successful: 1, failed: 0 },
_seq_no: 84,
_primary_term: 1,
status: 200,
},
},
{
update: {
_index: '.kibana_task_manager_9.0.0_001',
_id: 'task:456',
_version: 2,
result: 'updated',
_shards: { total: 1, successful: 1, failed: 0 },
_seq_no: 84,
_primary_term: 1,
status: 200,
},
},
{
update: {
_index: '.kibana_task_manager_9.0.0_001',
_id: 'task:789',
_version: 2,
error: {
type: 'document_missing_exception',
reason: '[5]: document missing',
index_uuid: 'aAsFqTI0Tc2W0LCWgPNrOA',
shard: '0',
index: '.kibana_task_manager_9.0.0_001',
},
status: 404,
},
},
],
});
coreSetup.getStartServices.mockResolvedValue([
{
...coreMock.createStart(),
elasticsearch: esClient,
},
{},
coreMock.createSetup(),
]);
// @ts-expect-error
esClient.client.asInternalUser.search.mockResponse({
hits: { hits: [createTaskDoc('123'), createTaskDoc('456'), createTaskDoc('789')], total: 3 },
});
const runner = taskRunner(logger, coreSetup.getStartServices)();
const result = await runner.run();
expect(esClient.client.asInternalUser.bulk).toHaveBeenCalledWith({
body: [
{ update: { _id: 'task:123' } },
{ doc: { task: { status: 'unrecognized' } } },
{ update: { _id: 'task:456' } },
{ doc: { task: { status: 'unrecognized' } } },
{ update: { _id: 'task:789' } },
{ doc: { task: { status: 'unrecognized' } } },
],
index: '.kibana_task_manager',
refresh: false,
});
expect(logger.warn).toHaveBeenCalledWith(
`Error updating task task:789 to mark as unrecognized - {\"type\":\"document_missing_exception\",\"reason\":\"[5]: document missing\",\"index_uuid\":\"aAsFqTI0Tc2W0LCWgPNrOA\",\"shard\":\"0\",\"index\":\".kibana_task_manager_9.0.0_001\"}`
);
expect(logger.debug).toHaveBeenCalledWith(`Marked 2 removed tasks as unrecognized`);
expect(result).toEqual({
state: {},
schedule: { interval: SCHEDULE_INTERVAL },
});
});
});

View file

@ -0,0 +1,150 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Logger } from '@kbn/logging';
import { CoreStart } from '@kbn/core-lifecycle-server';
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { SearchHit } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { TaskScheduling } from '../task_scheduling';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { ConcreteTaskInstance, TaskManagerStartContract } from '..';
import { TaskStatus } from '../task';
import { REMOVED_TYPES } from '../task_type_dictionary';
import { TASK_MANAGER_INDEX } from '../constants';
export const TASK_ID = 'mark_removed_tasks_as_unrecognized';
const TASK_TYPE = `task_manager:${TASK_ID}`;
export const SCHEDULE_INTERVAL = '1h';
export async function scheduleMarkRemovedTasksAsUnrecognizedDefinition(
logger: Logger,
taskScheduling: TaskScheduling
) {
try {
await taskScheduling.ensureScheduled({
id: TASK_ID,
taskType: TASK_TYPE,
schedule: { interval: SCHEDULE_INTERVAL },
state: {},
params: {},
});
} catch (e) {
logger.error(`Error scheduling ${TASK_ID} task, received ${e.message}`);
}
}
export function registerMarkRemovedTasksAsUnrecognizedDefinition(
logger: Logger,
coreStartServices: () => Promise<[CoreStart, TaskManagerStartContract, unknown]>,
taskTypeDictionary: TaskTypeDictionary
) {
taskTypeDictionary.registerTaskDefinitions({
[TASK_TYPE]: {
title: 'Mark removed tasks as unrecognized',
createTaskRunner: taskRunner(logger, coreStartServices),
},
});
}
export function taskRunner(
logger: Logger,
coreStartServices: () => Promise<[CoreStart, TaskManagerStartContract, unknown]>
) {
return () => {
return {
async run() {
try {
const [{ elasticsearch }] = await coreStartServices();
const esClient = elasticsearch.client.asInternalUser;
const removedTasks = await queryForRemovedTasks(esClient);
if (removedTasks.length > 0) {
await updateTasksToBeUnrecognized(esClient, logger, removedTasks);
}
return {
state: {},
schedule: { interval: SCHEDULE_INTERVAL },
};
} catch (e) {
logger.error(`Failed to mark removed tasks as unrecognized. Error: ${e.message}`);
return {
state: {},
schedule: { interval: SCHEDULE_INTERVAL },
};
}
},
};
};
}
async function queryForRemovedTasks(
esClient: ElasticsearchClient
): Promise<Array<SearchHit<ConcreteTaskInstance>>> {
const result = await esClient.search<ConcreteTaskInstance>({
index: TASK_MANAGER_INDEX,
body: {
size: 100,
_source: false,
query: {
bool: {
must: [
{
terms: {
'task.taskType': REMOVED_TYPES,
},
},
],
},
},
},
});
return result.hits.hits;
}
async function updateTasksToBeUnrecognized(
esClient: ElasticsearchClient,
logger: Logger,
removedTasks: Array<SearchHit<ConcreteTaskInstance>>
) {
const bulkBody = [];
for (const task of removedTasks) {
bulkBody.push({ update: { _id: task._id } });
bulkBody.push({ doc: { task: { status: TaskStatus.Unrecognized } } });
}
let removedCount = 0;
try {
const removeResults = await esClient.bulk({
index: TASK_MANAGER_INDEX,
refresh: false,
body: bulkBody,
});
for (const removeResult of removeResults.items) {
if (!removeResult.update || !removeResult.update._id) {
logger.warn(
`Error updating task with unknown to mark as unrecognized - malformed response`
);
} else if (removeResult.update?.error) {
logger.warn(
`Error updating task ${
removeResult.update._id
} to mark as unrecognized - ${JSON.stringify(removeResult.update.error)}`
);
} else {
removedCount++;
}
}
logger.debug(`Marked ${removedCount} removed tasks as unrecognized`);
} catch (err) {
// don't worry too much about errors, we'll try again next time
logger.warn(`Error updating tasks to mark as unrecognized: ${err}`);
}
}

View file

@ -26,7 +26,6 @@ export interface TaskClaimerOpts {
events$: Subject<TaskClaim>;
taskStore: TaskStore;
definitions: TaskTypeDictionary;
unusedTypes: string[];
excludedTaskTypes: string[];
taskMaxAttempts: Record<string, number>;
logger: Logger;

View file

@ -190,7 +190,6 @@ describe('TaskClaiming', () => {
definitions,
taskStore: store,
excludedTaskTypes,
unusedTypes: unusedTaskTypes,
maxAttempts: taskClaimingOpts.maxAttempts ?? 2,
getAvailableCapacity: taskClaimingOpts.getAvailableCapacity ?? (() => 10),
taskPartitioner,
@ -206,20 +205,17 @@ describe('TaskClaiming', () => {
claimingOpts,
hits = [generateFakeTasks(1)],
excludedTaskTypes = [],
unusedTaskTypes = [],
}: {
storeOpts: Partial<StoreOpts>;
taskClaimingOpts: Partial<TaskClaimingOpts>;
claimingOpts: Omit<OwnershipClaimingOpts, 'size' | 'taskTypes'>;
hits?: ConcreteTaskInstance[][];
excludedTaskTypes?: string[];
unusedTaskTypes?: string[];
}) {
const { taskClaiming, store } = initialiseTestClaiming({
storeOpts,
taskClaimingOpts,
excludedTaskTypes,
unusedTaskTypes,
hits,
});
@ -355,7 +351,6 @@ describe('TaskClaiming', () => {
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
unusedTypes: [],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
@ -378,7 +373,7 @@ describe('TaskClaiming', () => {
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 3; updateErrors: 0; getErrors: 0; removed: 0;',
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 3; updateErrors: 0; getErrors: 0;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
@ -440,312 +435,6 @@ describe('TaskClaiming', () => {
expect(result.docs.length).toEqual(3);
});
test('should not claim tasks of removed type', async () => {
const store = taskStoreMock.create({ taskManagerId: 'test-test' });
store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`));
const fetchedTasks = [
mockInstance({ id: `id-1`, taskType: 'report' }),
mockInstance({ id: `id-2`, taskType: 'report' }),
mockInstance({ id: `id-3`, taskType: 'yawn' }),
];
const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks);
store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap });
store.getDocVersions.mockResolvedValueOnce(docLatestVersions);
store.bulkGet.mockResolvedValueOnce([fetchedTasks[2]].map(asOk));
store.bulkPartialUpdate.mockResolvedValueOnce([fetchedTasks[2]].map(getPartialUpdateResult));
store.bulkPartialUpdate.mockResolvedValueOnce(
[fetchedTasks[0], fetchedTasks[1]].map(getPartialUpdateResult)
);
const taskClaiming = new TaskClaiming({
logger: taskManagerLogger,
strategy: CLAIM_STRATEGY_MGET,
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
unusedTypes: ['report'],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
});
const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({
claimOwnershipUntil: new Date(),
});
if (!isOk<ClaimOwnershipResult, FillPoolResult>(resultOrErr)) {
expect(resultOrErr).toBe(undefined);
}
const result = unwrap(resultOrErr) as ClaimOwnershipResult;
expect(apm.startTransaction).toHaveBeenCalledWith(
TASK_MANAGER_MARK_AS_CLAIMED,
TASK_MANAGER_TRANSACTION_TYPE
);
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 1; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 2;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
size: 40,
seq_no_primary_term: true,
});
expect(store.getDocVersions).toHaveBeenCalledWith(['task:id-1', 'task:id-2', 'task:id-3']);
expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(2);
expect(store.bulkPartialUpdate).toHaveBeenNthCalledWith(1, [
{
id: fetchedTasks[2].id,
version: fetchedTasks[2].version,
scheduledAt: fetchedTasks[2].runAt,
attempts: 1,
ownerId: 'test-test',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
]);
expect(store.bulkPartialUpdate).toHaveBeenNthCalledWith(2, [
{
id: fetchedTasks[0].id,
version: fetchedTasks[0].version,
status: 'unrecognized',
},
{
id: fetchedTasks[1].id,
version: fetchedTasks[1].version,
status: 'unrecognized',
},
]);
expect(store.bulkGet).toHaveBeenCalledWith(['id-3']);
expect(result.stats).toEqual({
tasksClaimed: 1,
tasksConflicted: 0,
tasksErrors: 0,
tasksUpdated: 1,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(1);
});
test('should log warning if error updating single removed task as unrecognized', async () => {
const store = taskStoreMock.create({ taskManagerId: 'test-test' });
store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`));
const fetchedTasks = [
mockInstance({ id: `id-1`, taskType: 'report' }),
mockInstance({ id: `id-2`, taskType: 'report' }),
mockInstance({ id: `id-3`, taskType: 'yawn' }),
];
const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks);
store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap });
store.getDocVersions.mockResolvedValueOnce(docLatestVersions);
store.bulkGet.mockResolvedValueOnce([fetchedTasks[2]].map(asOk));
store.bulkPartialUpdate.mockResolvedValueOnce([fetchedTasks[2]].map(getPartialUpdateResult));
store.bulkPartialUpdate.mockResolvedValueOnce([
asOk(fetchedTasks[0]),
asErr({
type: 'task',
id: fetchedTasks[1].id,
status: 404,
error: {
type: 'document_missing_exception',
reason: '[5]: document missing',
index_uuid: 'aAsFqTI0Tc2W0LCWgPNrOA',
shard: '0',
index: '.kibana_task_manager_8.16.0_001',
},
}),
]);
const taskClaiming = new TaskClaiming({
logger: taskManagerLogger,
strategy: CLAIM_STRATEGY_MGET,
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
unusedTypes: ['report'],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
});
const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({
claimOwnershipUntil: new Date(),
});
if (!isOk<ClaimOwnershipResult, FillPoolResult>(resultOrErr)) {
expect(resultOrErr).toBe(undefined);
}
const result = unwrap(resultOrErr) as ClaimOwnershipResult;
expect(apm.startTransaction).toHaveBeenCalledWith(
TASK_MANAGER_MARK_AS_CLAIMED,
TASK_MANAGER_TRANSACTION_TYPE
);
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
expect(taskManagerLogger.warn).toHaveBeenCalledWith(
'Error updating task id-2:task to mark as unrecognized during claim: {"type":"document_missing_exception","reason":"[5]: document missing","index_uuid":"aAsFqTI0Tc2W0LCWgPNrOA","shard":"0","index":".kibana_task_manager_8.16.0_001"}',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 1; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 1;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
size: 40,
seq_no_primary_term: true,
});
expect(store.getDocVersions).toHaveBeenCalledWith(['task:id-1', 'task:id-2', 'task:id-3']);
expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(2);
expect(store.bulkPartialUpdate).toHaveBeenNthCalledWith(1, [
{
id: fetchedTasks[2].id,
version: fetchedTasks[2].version,
scheduledAt: fetchedTasks[2].runAt,
attempts: 1,
ownerId: 'test-test',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
]);
expect(store.bulkPartialUpdate).toHaveBeenNthCalledWith(2, [
{
id: fetchedTasks[0].id,
version: fetchedTasks[0].version,
status: 'unrecognized',
},
{
id: fetchedTasks[1].id,
version: fetchedTasks[1].version,
status: 'unrecognized',
},
]);
expect(store.bulkGet).toHaveBeenCalledWith(['id-3']);
expect(result.stats).toEqual({
tasksClaimed: 1,
tasksConflicted: 0,
tasksErrors: 0,
tasksUpdated: 1,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(1);
});
test('should log warning if error updating all removed tasks as unrecognized', async () => {
const store = taskStoreMock.create({ taskManagerId: 'test-test' });
store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`));
const fetchedTasks = [
mockInstance({ id: `id-1`, taskType: 'report' }),
mockInstance({ id: `id-2`, taskType: 'report' }),
mockInstance({ id: `id-3`, taskType: 'yawn' }),
];
const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks);
store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap });
store.getDocVersions.mockResolvedValueOnce(docLatestVersions);
store.bulkGet.mockResolvedValueOnce([fetchedTasks[2]].map(asOk));
store.bulkPartialUpdate.mockResolvedValueOnce([fetchedTasks[2]].map(getPartialUpdateResult));
store.bulkPartialUpdate.mockRejectedValueOnce(new Error('Oh no'));
const taskClaiming = new TaskClaiming({
logger: taskManagerLogger,
strategy: CLAIM_STRATEGY_MGET,
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
unusedTypes: ['report'],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
});
const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({
claimOwnershipUntil: new Date(),
});
if (!isOk<ClaimOwnershipResult, FillPoolResult>(resultOrErr)) {
expect(resultOrErr).toBe(undefined);
}
const result = unwrap(resultOrErr) as ClaimOwnershipResult;
expect(apm.startTransaction).toHaveBeenCalledWith(
TASK_MANAGER_MARK_AS_CLAIMED,
TASK_MANAGER_TRANSACTION_TYPE
);
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
expect(taskManagerLogger.warn).toHaveBeenCalledWith(
'Error updating tasks to mark as unrecognized during claim: Error: Oh no',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 1; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
size: 40,
seq_no_primary_term: true,
});
expect(store.getDocVersions).toHaveBeenCalledWith(['task:id-1', 'task:id-2', 'task:id-3']);
expect(store.bulkGet).toHaveBeenCalledWith(['id-3']);
expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(2);
expect(store.bulkPartialUpdate).toHaveBeenNthCalledWith(1, [
{
id: fetchedTasks[2].id,
version: fetchedTasks[2].version,
scheduledAt: fetchedTasks[2].runAt,
attempts: 1,
ownerId: 'test-test',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
]);
expect(store.bulkPartialUpdate).toHaveBeenNthCalledWith(2, [
{
id: fetchedTasks[0].id,
version: fetchedTasks[0].version,
status: 'unrecognized',
},
{
id: fetchedTasks[1].id,
version: fetchedTasks[1].version,
status: 'unrecognized',
},
]);
expect(result.stats).toEqual({
tasksClaimed: 1,
tasksConflicted: 0,
tasksErrors: 0,
tasksUpdated: 1,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(1);
});
test('should handle no tasks to claim', async () => {
const store = taskStoreMock.create({ taskManagerId: 'test-test' });
store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`));
@ -761,7 +450,6 @@ describe('TaskClaiming', () => {
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
unusedTypes: [],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
@ -828,7 +516,6 @@ describe('TaskClaiming', () => {
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
unusedTypes: [],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
@ -851,7 +538,7 @@ describe('TaskClaiming', () => {
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; getErrors: 0;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
@ -922,7 +609,6 @@ describe('TaskClaiming', () => {
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
unusedTypes: [],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
@ -945,7 +631,7 @@ describe('TaskClaiming', () => {
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; getErrors: 0;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
@ -1016,7 +702,6 @@ describe('TaskClaiming', () => {
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
unusedTypes: [],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
@ -1039,7 +724,7 @@ describe('TaskClaiming', () => {
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 2; stale: 1; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
'task claimer claimed: 2; stale: 1; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
@ -1116,7 +801,6 @@ describe('TaskClaiming', () => {
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
unusedTypes: [],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
@ -1139,7 +823,7 @@ describe('TaskClaiming', () => {
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
@ -1248,7 +932,6 @@ describe('TaskClaiming', () => {
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
unusedTypes: [],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
@ -1271,7 +954,7 @@ describe('TaskClaiming', () => {
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 1; removed: 0;',
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 1;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(taskManagerLogger.error).toHaveBeenCalledWith(
@ -1377,7 +1060,6 @@ describe('TaskClaiming', () => {
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
unusedTypes: [],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
@ -1400,7 +1082,7 @@ describe('TaskClaiming', () => {
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(taskManagerLogger.warn).toHaveBeenCalledWith(
@ -1504,7 +1186,6 @@ describe('TaskClaiming', () => {
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
unusedTypes: [],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
@ -1619,7 +1300,6 @@ describe('TaskClaiming', () => {
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
unusedTypes: [],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
@ -1642,7 +1322,7 @@ describe('TaskClaiming', () => {
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 1; getErrors: 0; removed: 0;',
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 1; getErrors: 0;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(taskManagerLogger.error).toHaveBeenCalledWith(
@ -1753,7 +1433,6 @@ describe('TaskClaiming', () => {
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
unusedTypes: [],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
@ -1776,7 +1455,7 @@ describe('TaskClaiming', () => {
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(taskManagerLogger.error).not.toHaveBeenCalled();
@ -1870,7 +1549,6 @@ describe('TaskClaiming', () => {
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
unusedTypes: [],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
@ -2488,7 +2166,6 @@ describe('TaskClaiming', () => {
strategy: CLAIM_STRATEGY_MGET,
definitions,
excludedTaskTypes: [],
unusedTypes: [],
taskStore,
maxAttempts: 2,
getAvailableCapacity,

View file

@ -57,7 +57,6 @@ interface OwnershipClaimingOpts {
claimOwnershipUntil: Date;
size: number;
taskTypes: Set<string>;
removedTypes: Set<string>;
getCapacity: (taskType?: string | undefined) => number;
excludedTaskTypePatterns: string[];
taskStore: TaskStore;
@ -90,19 +89,16 @@ export async function claimAvailableTasksMget(
async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershipResult> {
const { getCapacity, claimOwnershipUntil, batches, events$, taskStore, taskPartitioner } = opts;
const { definitions, unusedTypes, excludedTaskTypes, taskMaxAttempts } = opts;
const { definitions, excludedTaskTypes, taskMaxAttempts } = opts;
const logger = createWrappedLogger({ logger: opts.logger, tags: [claimAvailableTasksMget.name] });
const initialCapacity = getCapacity();
const stopTaskTimer = startTaskTimer();
const removedTypes = new Set(unusedTypes); // REMOVED_TYPES
// get a list of candidate tasks to claim, with their version info
const { docs, versionMap } = await searchAvailableTasks({
definitions,
taskTypes: new Set(definitions.getAllTypes()),
excludedTaskTypePatterns: excludedTaskTypes,
removedTypes,
taskStore,
events$,
claimOwnershipUntil,
@ -125,18 +121,12 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
// use mget to get the latest version of each task
const docLatestVersions = await taskStore.getDocVersions(docs.map((doc) => `task:${doc.id}`));
// filter out stale, missing and removed tasks
// filter out stale and missing tasks
const currentTasks: ConcreteTaskInstance[] = [];
const staleTasks: ConcreteTaskInstance[] = [];
const missingTasks: ConcreteTaskInstance[] = [];
const removedTasks: ConcreteTaskInstance[] = [];
for (const searchDoc of docs) {
if (removedTypes.has(searchDoc.taskType)) {
removedTasks.push(searchDoc);
continue;
}
const searchVersion = versionMap.get(searchDoc.id);
const latestVersion = docLatestVersions.get(`task:${searchDoc.id}`);
if (!searchVersion || !latestVersion) {
@ -236,42 +226,8 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
return acc;
}, []);
// separate update for removed tasks; shouldn't happen often, so unlikely
// a performance concern, and keeps the rest of the logic simpler
let removedCount = 0;
if (removedTasks.length > 0) {
const tasksToRemove = Array.from(removedTasks);
const tasksToRemoveUpdates: PartialConcreteTaskInstance[] = [];
for (const task of tasksToRemove) {
tasksToRemoveUpdates.push({
id: task.id,
status: TaskStatus.Unrecognized,
});
}
// don't worry too much about errors, we'll get them next time
try {
const removeResults = await taskStore.bulkPartialUpdate(tasksToRemoveUpdates);
for (const removeResult of removeResults) {
if (isOk(removeResult)) {
removedCount++;
} else {
const { id, type, error } = removeResult.error;
logger.warn(
`Error updating task ${id}:${type} to mark as unrecognized during claim: ${JSON.stringify(
error
)}`
);
}
}
} catch (err) {
// swallow the error because this is unrelated to the claim cycle
logger.warn(`Error updating tasks to mark as unrecognized during claim: ${err}`);
}
}
// TODO: need a better way to generate stats
const message = `task claimer claimed: ${fullTasksToRun.length}; stale: ${staleTasks.length}; conflicts: ${conflicts}; missing: ${missingTasks.length}; capacity reached: ${leftOverTasks.length}; updateErrors: ${bulkUpdateErrors}; getErrors: ${bulkGetErrors}; removed: ${removedCount};`;
const message = `task claimer claimed: ${fullTasksToRun.length}; stale: ${staleTasks.length}; conflicts: ${conflicts}; missing: ${missingTasks.length}; capacity reached: ${leftOverTasks.length}; updateErrors: ${bulkUpdateErrors}; getErrors: ${bulkGetErrors};`;
logger.debug(message);
// build results
@ -306,7 +262,6 @@ export const NO_ASSIGNED_PARTITIONS_WARNING_INTERVAL = 60000;
async function searchAvailableTasks({
definitions,
taskTypes,
removedTypes,
excludedTaskTypePatterns,
taskStore,
getCapacity,
@ -318,7 +273,6 @@ async function searchAvailableTasks({
const claimPartitions = buildClaimPartitions({
types: taskTypes,
excludedTaskTypes,
removedTypes,
getCapacity,
definitions,
});
@ -352,10 +306,7 @@ async function searchAvailableTasks({
// Task must be enabled
EnabledTask,
// a task type that's not excluded (may be removed or not)
OneOfTaskTypes(
'task.taskType',
claimPartitions.unlimitedTypes.concat(Array.from(removedTypes))
),
OneOfTaskTypes('task.taskType', claimPartitions.unlimitedTypes),
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt),
@ -407,7 +358,6 @@ async function searchAvailableTasks({
}
interface ClaimPartitions {
removedTypes: string[];
unlimitedTypes: string[];
limitedTypes: Map<string, number>;
}
@ -415,30 +365,23 @@ interface ClaimPartitions {
interface BuildClaimPartitionsOpts {
types: Set<string>;
excludedTaskTypes: Set<string>;
removedTypes: Set<string>;
getCapacity: (taskType?: string) => number;
definitions: TaskTypeDictionary;
}
function buildClaimPartitions(opts: BuildClaimPartitionsOpts): ClaimPartitions {
const result: ClaimPartitions = {
removedTypes: [],
unlimitedTypes: [],
limitedTypes: new Map(),
};
const { types, excludedTaskTypes, removedTypes, getCapacity, definitions } = opts;
const { types, excludedTaskTypes, getCapacity, definitions } = opts;
for (const type of types) {
const definition = definitions.get(type);
if (definition == null) continue;
if (excludedTaskTypes.has(type)) continue;
if (removedTypes.has(type)) {
result.removedTypes.push(type);
continue;
}
if (definition.maxConcurrency == null) {
result.unlimitedTypes.push(definition.type);
continue;

View file

@ -99,14 +99,12 @@ describe('TaskClaiming', () => {
hits = [generateFakeTasks(1)],
versionConflicts = 2,
excludedTaskTypes = [],
unusedTaskTypes = [],
}: {
storeOpts: Partial<StoreOpts>;
taskClaimingOpts: Partial<TaskClaimingOpts>;
hits?: ConcreteTaskInstance[][];
versionConflicts?: number;
excludedTaskTypes?: string[];
unusedTaskTypes?: string[];
}) {
const definitions = storeOpts.definitions ?? taskDefinitions;
const store = taskStoreMock.create({ taskManagerId: storeOpts.taskManagerId });
@ -136,7 +134,6 @@ describe('TaskClaiming', () => {
definitions,
taskStore: store,
excludedTaskTypes,
unusedTypes: unusedTaskTypes,
maxAttempts: taskClaimingOpts.maxAttempts ?? 2,
getAvailableCapacity: taskClaimingOpts.getAvailableCapacity ?? (() => 10),
taskPartitioner,
@ -153,7 +150,6 @@ describe('TaskClaiming', () => {
hits = [generateFakeTasks(1)],
versionConflicts = 2,
excludedTaskTypes = [],
unusedTaskTypes = [],
}: {
storeOpts: Partial<StoreOpts>;
taskClaimingOpts: Partial<TaskClaimingOpts>;
@ -161,14 +157,12 @@ describe('TaskClaiming', () => {
hits?: ConcreteTaskInstance[][];
versionConflicts?: number;
excludedTaskTypes?: string[];
unusedTaskTypes?: string[];
}) {
const getCapacity = taskClaimingOpts.getAvailableCapacity ?? (() => 10);
const { taskClaiming, store } = initialiseTestClaiming({
storeOpts,
taskClaimingOpts,
excludedTaskTypes,
unusedTaskTypes,
hits,
versionConflicts,
});
@ -471,7 +465,6 @@ if (doc['task.runAt'].size()!=0) {
'anotherLimitedToOne',
'limitedToTwo',
],
unusedTaskTypes: [],
taskMaxAttempts: {
unlimited: maxAttempts,
},
@ -493,7 +486,6 @@ if (doc['task.runAt'].size()!=0) {
'anotherLimitedToOne',
'limitedToTwo',
],
unusedTaskTypes: [],
taskMaxAttempts: {
limitedToOne: maxAttempts,
},
@ -640,7 +632,6 @@ if (doc['task.runAt'].size()!=0) {
},
taskPartitioner,
excludedTaskTypes: [],
unusedTypes: [],
});
const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({
@ -848,129 +839,6 @@ if (doc['task.runAt'].size()!=0) {
expect(firstCycle).not.toMatchObject(secondCycle);
});
test('it passes any unusedTaskTypes to script', async () => {
const maxAttempts = _.random(2, 43);
const customMaxAttempts = _.random(44, 100);
const taskManagerId = uuidv1();
const fieldUpdates = {
ownerId: taskManagerId,
retryAt: new Date(Date.now()),
};
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: {
title: 'foo',
createTaskRunner: jest.fn(),
},
bar: {
title: 'bar',
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
foobar: {
title: 'foobar',
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
});
const {
args: {
updateByQuery: [{ query, script }],
},
} = await testClaimAvailableTasks({
storeOpts: {
definitions,
taskManagerId,
},
taskClaimingOpts: {
maxAttempts,
},
claimingOpts: {
claimOwnershipUntil: new Date(),
},
excludedTaskTypes: ['foobar'],
unusedTaskTypes: ['barfoo'],
});
expect(query).toMatchObject({
bool: {
must: [
{
bool: {
must: [
{
term: {
'task.enabled': true,
},
},
],
},
},
{
bool: {
should: [
{
bool: {
must: [
{ term: { 'task.status': 'idle' } },
{ range: { 'task.runAt': { lte: 'now' } } },
],
},
},
{
bool: {
must: [
{
bool: {
should: [
{ term: { 'task.status': 'running' } },
{ term: { 'task.status': 'claiming' } },
],
},
},
{ range: { 'task.retryAt': { lte: 'now' } } },
],
},
},
],
},
},
],
filter: [
{
bool: {
must_not: [
{
bool: {
should: [
{ term: { 'task.status': 'running' } },
{ term: { 'task.status': 'claiming' } },
],
must: { range: { 'task.retryAt': { gt: 'now' } } },
},
},
],
},
},
],
},
});
expect(script).toMatchObject({
source: expect.any(String),
lang: 'painless',
params: {
fieldUpdates,
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: ['foobar'],
unusedTaskTypes: ['barfoo'],
taskMaxAttempts: {
bar: customMaxAttempts,
foo: maxAttempts,
},
},
});
});
test('it claims tasks by setting their ownerId, status and retryAt', async () => {
const taskManagerId = uuidv1();
const claimOwnershipUntil = new Date(Date.now());
@ -1356,7 +1224,6 @@ if (doc['task.runAt'].size()!=0) {
strategy: 'update_by_query',
definitions,
excludedTaskTypes: [],
unusedTypes: [],
taskStore,
maxAttempts: 2,
getAvailableCapacity,

View file

@ -51,7 +51,6 @@ interface OwnershipClaimingOpts {
taskStore: TaskStore;
events$: Subject<TaskClaim>;
definitions: TaskTypeDictionary;
unusedTypes: string[];
excludedTaskTypes: string[];
taskMaxAttempts: Record<string, number>;
}
@ -60,7 +59,7 @@ export async function claimAvailableTasksUpdateByQuery(
opts: TaskClaimerOpts
): Promise<ClaimOwnershipResult> {
const { getCapacity, claimOwnershipUntil, batches, events$, taskStore } = opts;
const { definitions, unusedTypes, excludedTaskTypes, taskMaxAttempts } = opts;
const { definitions, excludedTaskTypes, taskMaxAttempts } = opts;
const initialCapacity = getCapacity();
let accumulatedResult = getEmptyClaimOwnershipResult();
@ -83,7 +82,6 @@ export async function claimAvailableTasksUpdateByQuery(
taskTypes: isLimited(batch) ? new Set([batch.tasksTypes]) : batch.tasksTypes,
taskStore,
definitions,
unusedTypes,
excludedTaskTypes,
taskMaxAttempts,
});
@ -137,7 +135,6 @@ async function markAvailableTasksAsClaimed({
claimOwnershipUntil,
size,
taskTypes,
unusedTypes,
taskMaxAttempts,
}: OwnershipClaimingOpts): Promise<UpdateByQueryResult> {
const { taskTypesToSkip = [], taskTypesToClaim = [] } = groupBy(
@ -164,7 +161,6 @@ async function markAvailableTasksAsClaimed({
},
claimableTaskTypes: taskTypesToClaim,
skippedTaskTypes: taskTypesToSkip,
unusedTaskTypes: unusedTypes,
taskMaxAttempts: pick(taskMaxAttempts, taskTypesToClaim),
});

View file

@ -27,7 +27,8 @@
"@kbn/logging",
"@kbn/core-lifecycle-server",
"@kbn/cloud-plugin",
"@kbn/core-saved-objects-base-server-internal"
"@kbn/core-saved-objects-base-server-internal",
"@kbn/core-elasticsearch-server",
],
"exclude": ["target/**/*"]
}

View file

@ -95,6 +95,34 @@ export class SampleTaskManagerFixturePlugin
return res.ok({ body: {} });
}
);
router.post(
{
path: '/api/alerting_tasks/run_mark_tasks_as_unrecognized',
validate: {
body: schema.object({}),
},
},
async (
context: RequestHandlerContext,
req: KibanaRequest<any, any, any, any>,
res: KibanaResponseFactory
): Promise<IKibanaResponse<any>> => {
try {
const taskManager = await this.taskManagerStart;
await taskManager.ensureScheduled({
id: 'mark_removed_tasks_as_unrecognized',
taskType: 'task_manager:mark_removed_tasks_as_unrecognized',
schedule: { interval: '1h' },
state: {},
params: {},
});
return res.ok({ body: await taskManager.runSoon('mark_removed_tasks_as_unrecognized') });
} catch (err) {
return res.ok({ body: { id: 'mark_removed_tasks_as_unrecognized', error: `${err}` } });
}
}
);
}
public start(core: CoreStart, { taskManager }: SampleTaskManagerFixtureStartDeps) {

View file

@ -138,6 +138,12 @@ export default function createScheduledTaskIdTests({ getService }: FtrProviderCo
// When we enable the rule, the unrecognized task should be removed and a new
// task created in its place
await supertestWithoutAuth
.post('/api/alerting_tasks/run_mark_tasks_as_unrecognized')
.set('kbn-xsrf', 'foo')
.send({})
.expect(200);
// scheduled task should exist and be unrecognized
await retry.try(async () => {
const taskRecordLoaded = await getScheduledTask(RULE_ID);

View file

@ -114,6 +114,34 @@ export function initRoutes(
}
);
router.post(
{
path: `/api/sample_tasks/run_mark_removed_tasks_as_unrecognized`,
validate: {
body: schema.object({}),
},
},
async function (
context: RequestHandlerContext,
req: KibanaRequest<any, any, any, any>,
res: KibanaResponseFactory
): Promise<IKibanaResponse<any>> {
try {
const taskManager = await taskManagerStart;
await taskManager.ensureScheduled({
id: 'mark_removed_tasks_as_unrecognized',
taskType: 'task_manager:mark_removed_tasks_as_unrecognized',
schedule: { interval: '1h' },
state: {},
params: {},
});
return res.ok({ body: await taskManager.runSoon('mark_removed_tasks_as_unrecognized') });
} catch (err) {
return res.ok({ body: { id: 'mark_removed_tasks_as_unrecognized', error: `${err}` } });
}
}
);
router.post(
{
path: `/api/sample_tasks/bulk_enable`,

View file

@ -168,6 +168,7 @@ export default function ({ getService }: FtrProviderContext) {
'security:telemetry-timelines',
'session_cleanup',
'task_manager:delete_inactive_background_task_nodes',
'task_manager:mark_removed_tasks_as_unrecognized',
]);
});
});

View file

@ -92,6 +92,12 @@ export default function ({ getService }: FtrProviderContext) {
let scheduledTaskRuns = 0;
let scheduledTaskInstanceRunAt = scheduledTask.runAt;
await request
.post('/api/sample_tasks/run_mark_removed_tasks_as_unrecognized')
.set('kbn-xsrf', 'xxx')
.send({})
.expect(200);
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(tasks.length).to.eql(3);

View file

@ -117,6 +117,34 @@ export function initRoutes(
}
);
router.post(
{
path: `/api/sample_tasks/run_mark_removed_tasks_as_unrecognized`,
validate: {
body: schema.object({}),
},
},
async function (
context: RequestHandlerContext,
req: KibanaRequest<any, any, any, any>,
res: KibanaResponseFactory
): Promise<IKibanaResponse<any>> {
try {
const taskManager = await taskManagerStart;
await taskManager.ensureScheduled({
id: 'mark_removed_tasks_as_unrecognized',
taskType: 'task_manager:mark_removed_tasks_as_unrecognized',
schedule: { interval: '1h' },
state: {},
params: {},
});
return res.ok({ body: await taskManager.runSoon('mark_removed_tasks_as_unrecognized') });
} catch (err) {
return res.ok({ body: { id: 'mark_removed_tasks_as_unrecognized', error: `${err}` } });
}
}
);
router.post(
{
path: `/api/sample_tasks/bulk_enable`,

View file

@ -92,6 +92,12 @@ export default function ({ getService }: FtrProviderContext) {
let scheduledTaskRuns = 0;
let scheduledTaskInstanceRunAt = scheduledTask.runAt;
await request
.post('/api/sample_tasks/run_mark_removed_tasks_as_unrecognized')
.set('kbn-xsrf', 'xxx')
.send({})
.expect(200);
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(tasks.length).to.eql(3);