Fix issues around enabling and disabling tasks (#148985)

Resolves https://github.com/elastic/kibana/issues/148914
Resolves https://github.com/elastic/kibana/issues/149090
Resolves https://github.com/elastic/kibana/issues/149091
Resolves https://github.com/elastic/kibana/issues/149092

In this PR, I'm making the following Task Manager bulk APIs retry
whenever conflicts are encountered: `bulkEnable`, `bulkDisable`, and
`bulkUpdateSchedules`.

To accomplish this, the following had to be done:
- Revert the original PR (https://github.com/elastic/kibana/pull/147808)
because the retries didn't load the updated documents whenever version
conflicts were encountered and the approached had to be redesigned.
- Create a `retryableBulkUpdate` function that can be re-used among the
bulk APIs.
- Fix a bug in `task_store.ts` where `version` field wasn't passed
through properly (no type safety for some reason)
- Remove `entity` from being returned on bulk update errors. This helped
re-use the same response structure when objects weren't found
- Create a `bulkGet` API on the task store so we get the latest
documents prior to a ES refresh happening
- Create a single mock task function that mocks task manager tasks for
unit test purposes. This was necessary as other places were doing `as
unknown as BulkUpdateTaskResult` and escaping type safety

Flaky test runs:
- [Framework]
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/1776
- [Kibana Security]
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/1786

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Mike Côté 2023-01-26 12:23:30 -05:00 committed by GitHub
parent f78236a2e4
commit f49c3ac7bf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 718 additions and 477 deletions

View file

@ -229,7 +229,7 @@ const tryToDisableTasks = async ({
if (resultFromDisablingTasks.errors.length) {
logger.error(
`Failure to disable schedules for underlying tasks: ${resultFromDisablingTasks.errors
.map((error) => error.task.id)
.map((error) => error.id)
.join(', ')}`
);
}

View file

@ -253,7 +253,7 @@ export const tryToEnableTasks = async ({
async () => taskManager.bulkEnable(taskIdsToEnable)
);
resultFromEnablingTasks?.errors?.forEach((error) => {
taskIdsFailedToBeEnabled.push(error.task.id);
taskIdsFailedToBeEnabled.push(error.id);
});
if (resultFromEnablingTasks.tasks.length) {
logger.debug(
@ -265,7 +265,7 @@ export const tryToEnableTasks = async ({
if (resultFromEnablingTasks.errors.length) {
logger.error(
`Failure to enable schedules for underlying tasks: ${resultFromEnablingTasks.errors
.map((error) => error.task.id)
.map((error) => error.id)
.join(', ')}`
);
}

View file

@ -352,10 +352,11 @@ describe('bulkDisableRules', () => {
});
taskManager.bulkDisable.mockResolvedValue({
tasks: [{ id: 'id1' }],
tasks: [taskManagerMock.createTask({ id: 'id1' })],
errors: [
{
task: { id: 'id2' },
type: 'task',
id: 'id2',
error: {
error: '',
message: 'UPS',
@ -363,7 +364,7 @@ describe('bulkDisableRules', () => {
},
},
],
} as unknown as BulkUpdateTaskResult);
});
await rulesClient.bulkDisableRules({ filter: 'fake_filter' });

View file

@ -383,24 +383,20 @@ describe('bulkEnableRules', () => {
unsecuredSavedObjectsClient.bulkCreate.mockResolvedValue({
saved_objects: [enabledRule1, enabledRule2],
});
taskManager.bulkEnable.mockImplementation(
async () =>
({
tasks: [{ id: 'id1' }],
errors: [
{
task: {
id: 'id2',
},
error: {
error: '',
message: 'UPS',
statusCode: 500,
},
},
],
} as unknown as BulkUpdateTaskResult)
);
taskManager.bulkEnable.mockImplementation(async () => ({
tasks: [taskManagerMock.createTask({ id: 'id1' })],
errors: [
{
type: 'task',
id: 'id2',
error: {
error: '',
message: 'UPS',
statusCode: 500,
},
},
],
}));
const result = await rulesClient.bulkEnableRules({ filter: 'fake_filter' });

View file

@ -5,11 +5,10 @@
* 2.0.
*/
import { v4 as uuidv4 } from 'uuid';
import { taskStoreMock } from './task_store.mock';
import { BufferedTaskStore } from './buffered_task_store';
import { asErr, asOk } from './lib/result_type';
import { TaskStatus } from './task';
import { taskManagerMock } from './mocks';
describe('Buffered Task Store', () => {
test('proxies the TaskStore for `maxAttempts` and `remove`', async () => {
@ -26,7 +25,7 @@ describe('Buffered Task Store', () => {
const taskStore = taskStoreMock.create();
const bufferedStore = new BufferedTaskStore(taskStore, {});
const task = mockTask();
const task = taskManagerMock.createTask();
taskStore.bulkUpdate.mockResolvedValue([asOk(task)]);
@ -38,11 +37,23 @@ describe('Buffered Task Store', () => {
const taskStore = taskStoreMock.create();
const bufferedStore = new BufferedTaskStore(taskStore, {});
const tasks = [mockTask(), mockTask(), mockTask()];
const tasks = [
taskManagerMock.createTask(),
taskManagerMock.createTask({ id: 'task_7c149afd-6250-4ca5-a314-20af1348d5e9' }),
taskManagerMock.createTask(),
];
taskStore.bulkUpdate.mockResolvedValueOnce([
asOk(tasks[0]),
asErr({ entity: tasks[1], error: new Error('Oh no, something went terribly wrong') }),
asErr({
type: 'task',
id: tasks[1].id,
error: {
statusCode: 400,
error: 'Oh no, something went terribly wrong',
message: 'Oh no, something went terribly wrong',
},
}),
asOk(tasks[2]),
]);
@ -52,9 +63,17 @@ describe('Buffered Task Store', () => {
bufferedStore.update(tasks[2]),
];
expect(await results[0]).toMatchObject(tasks[0]);
expect(results[1]).rejects.toMatchInlineSnapshot(
`[Error: Oh no, something went terribly wrong]`
);
expect(results[1]).rejects.toMatchInlineSnapshot(`
Object {
"error": Object {
"error": "Oh no, something went terribly wrong",
"message": "Oh no, something went terribly wrong",
"statusCode": 400,
},
"id": "task_7c149afd-6250-4ca5-a314-20af1348d5e9",
"type": "task",
}
`);
expect(await results[2]).toMatchObject(tasks[2]);
});
@ -62,17 +81,25 @@ describe('Buffered Task Store', () => {
const taskStore = taskStoreMock.create();
const bufferedStore = new BufferedTaskStore(taskStore, {});
const duplicateIdTask = mockTask();
const duplicateIdTask = taskManagerMock.createTask();
const tasks = [
duplicateIdTask,
mockTask(),
mockTask(),
{ ...mockTask(), id: duplicateIdTask.id },
taskManagerMock.createTask({ id: 'task_16748083-bc28-4599-893b-c8ec16e55c10' }),
taskManagerMock.createTask(),
taskManagerMock.createTask({ id: duplicateIdTask.id }),
];
taskStore.bulkUpdate.mockResolvedValueOnce([
asOk(tasks[0]),
asErr({ entity: tasks[1], error: new Error('Oh no, something went terribly wrong') }),
asErr({
type: 'task',
id: tasks[1].id,
error: {
statusCode: 400,
error: 'Oh no, something went terribly wrong',
message: 'Oh no, something went terribly wrong',
},
}),
asOk(tasks[2]),
asOk(tasks[3]),
]);
@ -84,31 +111,19 @@ describe('Buffered Task Store', () => {
bufferedStore.update(tasks[3]),
];
expect(await results[0]).toMatchObject(tasks[0]);
expect(results[1]).rejects.toMatchInlineSnapshot(
`[Error: Oh no, something went terribly wrong]`
);
expect(results[1]).rejects.toMatchInlineSnapshot(`
Object {
"error": Object {
"error": "Oh no, something went terribly wrong",
"message": "Oh no, something went terribly wrong",
"statusCode": 400,
},
"id": "task_16748083-bc28-4599-893b-c8ec16e55c10",
"type": "task",
}
`);
expect(await results[2]).toMatchObject(tasks[2]);
expect(await results[3]).toMatchObject(tasks[3]);
});
});
});
function mockTask() {
return {
id: `task_${uuidv4()}`,
attempts: 0,
schedule: undefined,
params: { hello: 'world' },
retryAt: null,
runAt: new Date(),
scheduledAt: new Date(),
scope: undefined,
startedAt: null,
state: { foo: 'bar' },
status: TaskStatus.Idle,
taskType: 'report',
user: undefined,
version: '123',
ownerId: '123',
};
}

View file

@ -15,15 +15,12 @@ import { unwrapPromise } from './lib/result_type';
const DEFAULT_BUFFER_MAX_DURATION = 50;
export class BufferedTaskStore implements Updatable {
private bufferedUpdate: Operation<ConcreteTaskInstance, Error>;
private bufferedUpdate: Operation<ConcreteTaskInstance>;
constructor(private readonly taskStore: TaskStore, options: BufferOptions) {
this.bufferedUpdate = createBuffer<ConcreteTaskInstance, Error>(
(docs) => taskStore.bulkUpdate(docs),
{
bufferMaxDuration: DEFAULT_BUFFER_MAX_DURATION,
...options,
}
);
this.bufferedUpdate = createBuffer<ConcreteTaskInstance>((docs) => taskStore.bulkUpdate(docs), {
bufferMaxDuration: DEFAULT_BUFFER_MAX_DURATION,
...options,
});
}
public async update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance> {

View file

@ -14,13 +14,13 @@ import { mockLogger } from './test_utils';
import { asErr, asOk } from './lib/result_type';
import { FillPoolResult } from './lib/fill_pool';
import { EphemeralTaskLifecycle, EphemeralTaskLifecycleOpts } from './ephemeral_task_lifecycle';
import { ConcreteTaskInstance, TaskStatus } from './task';
import { v4 as uuidv4 } from 'uuid';
import { asTaskPollingCycleEvent, asTaskRunEvent, TaskPersistence } from './task_events';
import { TaskRunResult } from './task_running';
import { TaskPoolRunResult } from './task_pool';
import { TaskPoolMock } from './task_pool.mock';
import { executionContextServiceMock } from '@kbn/core/server/mocks';
import { taskManagerMock } from './mocks';
const executionContext = executionContextServiceMock.createSetupContract();
@ -107,7 +107,7 @@ describe('EphemeralTaskLifecycle', () => {
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const task = mockTask();
const task = taskManagerMock.createTask();
expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asErr(task));
});
@ -116,7 +116,7 @@ describe('EphemeralTaskLifecycle', () => {
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const task = mockTask();
const task = taskManagerMock.createTask();
expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task));
});
@ -127,12 +127,12 @@ describe('EphemeralTaskLifecycle', () => {
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const task = mockTask();
const task = taskManagerMock.createTask();
expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task));
const task2 = mockTask();
const task2 = taskManagerMock.createTask();
expect(ephemeralTaskLifecycle.attemptToRun(task2)).toMatchObject(asOk(task2));
const rejectedTask = mockTask();
const rejectedTask = taskManagerMock.createTask();
expect(ephemeralTaskLifecycle.attemptToRun(rejectedTask)).toMatchObject(asErr(rejectedTask));
});
@ -141,7 +141,7 @@ describe('EphemeralTaskLifecycle', () => {
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const task = mockTask({ id: `my-phemeral-task` });
const task = taskManagerMock.createTask({ id: `my-phemeral-task` });
expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task));
poolCapacity.mockReturnValue({
@ -164,7 +164,7 @@ describe('EphemeralTaskLifecycle', () => {
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const task = mockTask({ id: `my-phemeral-task` });
const task = taskManagerMock.createTask({ id: `my-phemeral-task` });
expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task));
poolCapacity.mockReturnValue({
@ -175,7 +175,7 @@ describe('EphemeralTaskLifecycle', () => {
asTaskRunEvent(
uuidv4(),
asOk({
task: mockTask(),
task: taskManagerMock.createTask(),
result: TaskRunResult.Success,
persistence: TaskPersistence.Ephemeral,
})
@ -194,7 +194,11 @@ describe('EphemeralTaskLifecycle', () => {
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const tasks = [mockTask(), mockTask(), mockTask()];
const tasks = [
taskManagerMock.createTask(),
taskManagerMock.createTask(),
taskManagerMock.createTask(),
];
expect(ephemeralTaskLifecycle.attemptToRun(tasks[0])).toMatchObject(asOk(tasks[0]));
expect(ephemeralTaskLifecycle.attemptToRun(tasks[1])).toMatchObject(asOk(tasks[1]));
expect(ephemeralTaskLifecycle.attemptToRun(tasks[2])).toMatchObject(asOk(tasks[2]));
@ -228,8 +232,8 @@ describe('EphemeralTaskLifecycle', () => {
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const firstLimitedTask = mockTask({ taskType: 'report' });
const secondLimitedTask = mockTask({ taskType: 'report' });
const firstLimitedTask = taskManagerMock.createTask({ taskType: 'report' });
const secondLimitedTask = taskManagerMock.createTask({ taskType: 'report' });
// both are queued
expect(ephemeralTaskLifecycle.attemptToRun(firstLimitedTask)).toMatchObject(
asOk(firstLimitedTask)
@ -268,8 +272,8 @@ describe('EphemeralTaskLifecycle', () => {
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const firstLimitedTask = mockTask({ taskType: 'report' });
const secondLimitedTask = mockTask({ taskType: 'report' });
const firstLimitedTask = taskManagerMock.createTask({ taskType: 'report' });
const secondLimitedTask = taskManagerMock.createTask({ taskType: 'report' });
// both are queued
expect(ephemeralTaskLifecycle.attemptToRun(firstLimitedTask)).toMatchObject(
asOk(firstLimitedTask)
@ -317,17 +321,21 @@ describe('EphemeralTaskLifecycle', () => {
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const fooTasks = [mockTask(), mockTask(), mockTask()];
const fooTasks = [
taskManagerMock.createTask(),
taskManagerMock.createTask(),
taskManagerMock.createTask(),
];
expect(ephemeralTaskLifecycle.attemptToRun(fooTasks[0])).toMatchObject(asOk(fooTasks[0]));
const firstLimitedTask = mockTask({ taskType: 'report' });
const firstLimitedTask = taskManagerMock.createTask({ taskType: 'report' });
expect(ephemeralTaskLifecycle.attemptToRun(firstLimitedTask)).toMatchObject(
asOk(firstLimitedTask)
);
expect(ephemeralTaskLifecycle.attemptToRun(fooTasks[1])).toMatchObject(asOk(fooTasks[1]));
const secondLimitedTask = mockTask({ taskType: 'report' });
const secondLimitedTask = taskManagerMock.createTask({ taskType: 'report' });
expect(ephemeralTaskLifecycle.attemptToRun(secondLimitedTask)).toMatchObject(
asOk(secondLimitedTask)
);
@ -358,7 +366,11 @@ describe('EphemeralTaskLifecycle', () => {
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const tasks = [mockTask(), mockTask(), mockTask()];
const tasks = [
taskManagerMock.createTask(),
taskManagerMock.createTask(),
taskManagerMock.createTask(),
];
expect(ephemeralTaskLifecycle.attemptToRun(tasks[0])).toMatchObject(asOk(tasks[0]));
expect(ephemeralTaskLifecycle.attemptToRun(tasks[1])).toMatchObject(asOk(tasks[1]));
expect(ephemeralTaskLifecycle.attemptToRun(tasks[2])).toMatchObject(asOk(tasks[2]));
@ -383,23 +395,3 @@ describe('EphemeralTaskLifecycle', () => {
expect(ephemeralTaskLifecycle.queuedTasks).toBe(0);
});
});
function mockTask(overrides: Partial<ConcreteTaskInstance> = {}): ConcreteTaskInstance {
return {
id: uuidv4(),
runAt: new Date(),
taskType: 'foo',
schedule: undefined,
attempts: 0,
status: TaskStatus.Idle,
params: { hello: 'world' },
state: { baby: 'Henhen' },
user: 'jimbo',
scope: ['reporting'],
ownerId: '',
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
...overrides,
};
}

View file

@ -7,7 +7,7 @@
import { mockLogger } from '../test_utils';
import { createBuffer, Entity, OperationError, BulkOperation } from './bulk_operation_buffer';
import { createBuffer, Entity, ErrorOutput, BulkOperation } from './bulk_operation_buffer';
import { mapErr, asOk, asErr, Ok, Err } from './result_type';
interface TaskInstance extends Entity {
@ -29,21 +29,20 @@ function incrementAttempts(task: TaskInstance): Ok<TaskInstance> {
});
}
function errorAttempts(task: TaskInstance): Err<OperationError<TaskInstance, Error>> {
function errorAttempts(task: TaskInstance): Err<ErrorOutput> {
return asErr({
entity: incrementAttempts(task).value,
error: { name: '', message: 'Oh no, something went terribly wrong', statusCode: 500 },
type: 'task',
id: task.id,
error: { error: '', message: 'Oh no, something went terribly wrong', statusCode: 500 },
});
}
describe('Bulk Operation Buffer', () => {
describe('createBuffer()', () => {
test('batches up multiple Operation calls', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(
([task1, task2]) => {
return Promise.resolve([incrementAttempts(task1), incrementAttempts(task2)]);
}
);
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance>> = jest.fn(([task1, task2]) => {
return Promise.resolve([incrementAttempts(task1), incrementAttempts(task2)]);
});
const bufferedUpdate = createBuffer(bulkUpdate);
@ -58,7 +57,7 @@ describe('Bulk Operation Buffer', () => {
});
test('batch updates can be customised to execute after a certain period', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn((tasks) => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance>> = jest.fn((tasks) => {
return Promise.resolve(tasks.map(incrementAttempts));
});
@ -95,7 +94,7 @@ describe('Bulk Operation Buffer', () => {
});
test('batch updates are executed once queue hits a certain bound', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn((tasks) => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance>> = jest.fn((tasks) => {
return Promise.resolve(tasks.map(incrementAttempts));
});
@ -128,7 +127,7 @@ describe('Bulk Operation Buffer', () => {
});
test('queue upper bound is reset after each flush', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn((tasks) => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance>> = jest.fn((tasks) => {
return Promise.resolve(tasks.map(incrementAttempts));
});
@ -164,7 +163,7 @@ describe('Bulk Operation Buffer', () => {
});
test('handles both resolutions and rejections at individual task level', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance>> = jest.fn(
([task1, task2, task3]) => {
return Promise.resolve([
incrementAttempts(task1),
@ -183,10 +182,7 @@ describe('Bulk Operation Buffer', () => {
await Promise.all([
expect(bufferedUpdate(task1)).resolves.toMatchObject(incrementAttempts(task1)),
expect(bufferedUpdate(task2)).rejects.toMatchObject(
mapErr(
(err: OperationError<TaskInstance, Error>) => asErr(err.error),
errorAttempts(task2)
)
mapErr((err: ErrorOutput) => asErr(err), errorAttempts(task2))
),
expect(bufferedUpdate(task3)).resolves.toMatchObject(incrementAttempts(task3)),
]);
@ -195,7 +191,7 @@ describe('Bulk Operation Buffer', () => {
});
test('handles bulkUpdate failure', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(() => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance>> = jest.fn(() => {
return Promise.reject(new Error('bulkUpdate is an illusion'));
});
@ -230,7 +226,7 @@ describe('Bulk Operation Buffer', () => {
});
test('logs unknown bulk operation results', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance>> = jest.fn(
([task1, task2, task3]) => {
return Promise.resolve([
incrementAttempts(task1),

View file

@ -9,6 +9,7 @@ import { Logger } from '@kbn/core/server';
import { map } from 'lodash';
import { Subject, race, from } from 'rxjs';
import { bufferWhen, filter, bufferCount, flatMap, mapTo, first } from 'rxjs/operators';
import { SavedObjectError } from '@kbn/core-saved-objects-common';
import { either, Result, asOk, asErr, Ok, Err } from './result_type';
export interface BufferOptions {
@ -21,36 +22,30 @@ export interface Entity {
id: string;
}
export interface OperationError<Input, ErrorOutput> {
entity: Input;
error: ErrorOutput;
export interface ErrorOutput {
type: string;
id: string;
error: SavedObjectError;
}
export type OperationResult<Input, ErrorOutput, Output = Input> = Result<
Output,
OperationError<Input, ErrorOutput>
>;
export type OperationResult<T> = Result<T, ErrorOutput>;
export type Operation<Input, ErrorOutput, Output = Input> = (
entity: Input
) => Promise<Result<Output, ErrorOutput>>;
export type Operation<T> = (entity: T) => Promise<Result<T, ErrorOutput>>;
export type BulkOperation<Input, ErrorOutput, Output = Input> = (
entities: Input[]
) => Promise<Array<OperationResult<Input, ErrorOutput, Output>>>;
export type BulkOperation<T> = (entities: T[]) => Promise<Array<OperationResult<T>>>;
const DONT_FLUSH = false;
const FLUSH = true;
export function createBuffer<Input extends Entity, ErrorOutput, Output extends Entity = Input>(
bulkOperation: BulkOperation<Input, ErrorOutput, Output>,
export function createBuffer<T extends Entity>(
bulkOperation: BulkOperation<T>,
{ bufferMaxDuration = 0, bufferMaxOperations = Number.MAX_VALUE, logger }: BufferOptions = {}
): Operation<Input, ErrorOutput, Output> {
): Operation<T> {
const flushBuffer = new Subject<void>();
const storeUpdateBuffer = new Subject<{
entity: Input;
onSuccess: (entity: Ok<Output>) => void;
entity: T;
onSuccess: (entity: Ok<T>) => void;
onFailure: (error: Err<ErrorOutput | Error>) => void;
}>();
@ -82,17 +77,15 @@ export function createBuffer<Input extends Entity, ErrorOutput, Output extends E
}
);
},
({ entity, error }: OperationError<Input, ErrorOutput>) => {
(error: ErrorOutput) => {
either(
pullFirstWhere(bufferedEntities, ({ entity: { id } }) => id === entity.id),
pullFirstWhere(bufferedEntities, ({ entity: { id } }) => id === error.id),
({ onFailure }) => {
onFailure(asErr(error));
},
() => {
if (logger) {
logger.warn(
`Unhandled failed Bulk Operation result: ${entity?.id ? entity.id : entity}`
);
logger.warn(`Unhandled failed Bulk Operation result: ${error.id}`);
}
}
);
@ -148,7 +141,7 @@ export function createBuffer<Input extends Entity, ErrorOutput, Output extends E
error: flushAndResetCounter,
});
return async function (entity: Input) {
return async function (entity: T) {
return new Promise((resolve, reject) => {
storeUpdateBuffer.next({ entity, onSuccess: resolve, onFailure: reject });
});

View file

@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { asErr, asOk } from './result_type';
import { retryableBulkUpdate } from './retryable_bulk_update';
import { taskStoreMock } from '../task_store.mock';
import { TaskStatus } from '../task';
import { taskManagerMock } from '../mocks';
describe('retryableBulkUpdate()', () => {
const taskIds = ['1', '2', '3'];
const tasks = [
taskManagerMock.createTask({ id: '1' }),
taskManagerMock.createTask({ id: '2' }),
taskManagerMock.createTask({ id: '3' }),
];
const getTasks = jest.fn();
const filter = jest.fn();
const map = jest.fn();
const store = taskStoreMock.create();
beforeEach(() => {
jest.resetAllMocks();
getTasks.mockResolvedValue(tasks.map((task) => asOk(task)));
filter.mockImplementation(() => true);
map.mockImplementation((task) => task);
store.bulkUpdate.mockResolvedValue(tasks.map((task) => asOk(task)));
});
it('should call getTasks with taskIds', async () => {
await retryableBulkUpdate({ taskIds, getTasks, filter, map, store });
expect(getTasks).toHaveBeenCalledWith(taskIds);
});
it('should filter tasks returned from getTasks', async () => {
filter.mockImplementation((task) => task.id === '2');
await retryableBulkUpdate({ taskIds, getTasks, filter, map, store });
expect(filter).toHaveBeenCalledTimes(3);
// Map happens after filter
expect(map).toHaveBeenCalledTimes(1);
expect(store.bulkUpdate).toHaveBeenCalledWith([tasks[1]]);
});
it('should map tasks returned from getTasks', async () => {
map.mockImplementation((task) => ({ ...task, status: TaskStatus.Claiming }));
await retryableBulkUpdate({ taskIds, getTasks, filter, map, store });
expect(map).toHaveBeenCalledTimes(3);
expect(store.bulkUpdate).toHaveBeenCalledWith(
tasks.map((task) => ({ ...task, status: TaskStatus.Claiming }))
);
});
it('should retry tasks that have a status code of 409', async () => {
getTasks.mockResolvedValueOnce(tasks.map((task) => asOk(task)));
store.bulkUpdate.mockResolvedValueOnce([
asErr({
type: 'task',
id: tasks[0].id,
error: {
statusCode: 409,
error: 'Conflict',
message: 'Conflict',
},
}),
asOk(tasks[1]),
asOk(tasks[2]),
]);
getTasks.mockResolvedValueOnce([tasks[0]].map((task) => asOk(task)));
store.bulkUpdate.mockResolvedValueOnce(tasks.map((task) => asOk(task)));
await retryableBulkUpdate({ taskIds, getTasks, filter, map, store });
expect(store.bulkUpdate).toHaveBeenCalledTimes(2);
expect(store.bulkUpdate).toHaveBeenNthCalledWith(2, [tasks[0]]);
});
it('should skip updating tasks that cannot be found', async () => {
getTasks.mockResolvedValue([
asOk(tasks[0]),
asErr({
id: tasks[1].id,
type: 'task',
error: { error: 'Oh no', message: 'Oh no', statusCode: 404 },
}),
asOk(tasks[2]),
]);
await retryableBulkUpdate({ taskIds, getTasks, filter, map, store });
expect(store.bulkUpdate).toHaveBeenCalledWith([tasks[0], tasks[2]]);
});
});

View file

@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SavedObjectError } from '@kbn/core-saved-objects-common';
import { ConcreteTaskInstance } from '../task';
import { TaskStore, BulkUpdateResult, BulkGetResult } from '../task_store';
import { isErr, isOk, asErr } from './result_type';
import { BulkUpdateTaskResult } from '../task_scheduling';
export const MAX_RETRIES = 2;
export interface RetryableBulkUpdateOpts {
taskIds: string[];
getTasks: (taskIds: string[]) => Promise<BulkGetResult>;
filter: (task: ConcreteTaskInstance) => boolean;
map: (task: ConcreteTaskInstance) => ConcreteTaskInstance;
store: TaskStore;
}
export async function retryableBulkUpdate({
taskIds,
getTasks,
filter,
map,
store,
}: RetryableBulkUpdateOpts): Promise<BulkUpdateTaskResult> {
const resultMap: Record<string, BulkUpdateResult> = {};
async function attemptToUpdate(taskIdsToAttempt: string[]) {
const tasksToUpdate = (await getTasks(taskIdsToAttempt))
.reduce<ConcreteTaskInstance[]>((acc, task) => {
if (isErr(task)) {
resultMap[task.error.id] = buildBulkUpdateErr(task.error);
} else {
acc.push(task.value);
}
return acc;
}, [])
.filter(filter)
.map(map);
const bulkUpdateResult = await store.bulkUpdate(tasksToUpdate);
for (const result of bulkUpdateResult) {
const taskId = getId(result);
resultMap[taskId] = result;
}
}
await attemptToUpdate(taskIds);
let retry = 1;
while (retry++ <= MAX_RETRIES && getRetriableTaskIds(resultMap).length > 0) {
await attemptToUpdate(getRetriableTaskIds(resultMap));
}
return Object.values(resultMap).reduce<BulkUpdateTaskResult>(
(acc, result) => {
if (isOk(result)) {
acc.tasks.push(result.value);
} else {
acc.errors.push(result.error);
}
return acc;
},
{ tasks: [], errors: [] }
);
}
function getId(bulkUpdateResult: BulkUpdateResult): string {
return isOk(bulkUpdateResult) ? bulkUpdateResult.value.id : bulkUpdateResult.error.id;
}
function getRetriableTaskIds(resultMap: Record<string, BulkUpdateResult>) {
return Object.values(resultMap)
.filter((result) => isErr(result) && result.error.error.statusCode === 409)
.map((result) => getId(result));
}
function buildBulkUpdateErr(error: { type: string; id: string; error: SavedObjectError }) {
return asErr({
id: error.id,
type: error.type,
error: {
error: error.error.error,
statusCode: error.error.statusCode,
message: error.error.message,
...(error.error.metadata ? error.error.metadata : {}),
},
});
}

View file

@ -5,7 +5,9 @@
* 2.0.
*/
import { v4 as uuidv4 } from 'uuid';
import { TaskManagerSetupContract, TaskManagerStartContract } from './plugin';
import { ConcreteTaskInstance, TaskStatus } from './task';
const createSetupMock = () => {
const mock: jest.Mocked<TaskManagerSetupContract> = {
@ -38,7 +40,29 @@ const createStartMock = () => {
return mock;
};
const createTaskMock = (overrides: Partial<ConcreteTaskInstance> = {}): ConcreteTaskInstance => {
return {
id: `task_${uuidv4()}`,
attempts: 0,
schedule: undefined,
params: { hello: 'world' },
retryAt: null,
runAt: new Date(),
scheduledAt: new Date(),
scope: undefined,
startedAt: null,
state: { foo: 'bar' },
status: TaskStatus.Idle,
taskType: 'foo',
user: undefined,
version: '123',
ownerId: '123',
...overrides,
};
};
export const taskManagerMock = {
createSetup: createSetupMock,
createStart: createStartMock,
createTask: createTaskMock,
};

View file

@ -13,14 +13,14 @@ import { asTaskRunEvent, TaskPersistence } from './task_events';
import { TaskLifecycleEvent } from './polling_lifecycle';
import { TaskScheduling } from './task_scheduling';
import { asErr, asOk } from './lib/result_type';
import { ConcreteTaskInstance, TaskStatus } from './task';
import { TaskStatus } from './task';
import { createInitialMiddleware } from './lib/middleware';
import { taskStoreMock } from './task_store.mock';
import { TaskRunResult } from './task_running';
import { mockLogger } from './test_utils';
import { TaskTypeDictionary } from './task_type_dictionary';
import { ephemeralTaskLifecycleMock } from './ephemeral_task_lifecycle.mock';
import { mustBeAllOf } from './queries/query_clauses';
import { taskManagerMock } from './mocks';
let fakeTimer: sinon.SinonFakeTimers;
jest.mock('uuid', () => ({
@ -156,75 +156,72 @@ describe('TaskScheduling', () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
beforeEach(() => {
mockTaskStore.bulkUpdate.mockImplementation(() =>
Promise.resolve([{ tag: 'ok', value: mockTask() }])
Promise.resolve([{ tag: 'ok', value: taskManagerMock.createTask() }])
);
});
test('should search for tasks by ids enabled = false when enabling', async () => {
mockTaskStore.fetch.mockResolvedValue({ docs: [] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkEnable([id]);
expect(mockTaskStore.fetch).toHaveBeenCalledTimes(1);
expect(mockTaskStore.fetch).toHaveBeenCalledWith({
query: {
bool: {
must: [
{
terms: {
_id: [`task:${id}`],
},
},
{
term: {
'task.enabled': false,
},
},
],
},
},
size: 100,
});
});
test('should split search on chunks when input ids array too large', async () => {
mockTaskStore.fetch.mockResolvedValue({ docs: [] });
mockTaskStore.bulkGet.mockResolvedValue([]);
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkEnable(Array.from({ length: 1250 }), false);
expect(mockTaskStore.fetch).toHaveBeenCalledTimes(13);
expect(mockTaskStore.bulkGet).toHaveBeenCalledTimes(13);
});
test('should transform response into correct format', async () => {
const successfulTask = mockTask({
const successfulTask = taskManagerMock.createTask({
id: 'task-1',
enabled: false,
schedule: { interval: '1h' },
});
const failedTask = mockTask({ id: 'task-2', enabled: false, schedule: { interval: '1h' } });
const failedToUpdateTask = taskManagerMock.createTask({
id: 'task-2',
enabled: false,
schedule: { interval: '1h' },
});
mockTaskStore.bulkUpdate.mockImplementation(() =>
Promise.resolve([
{ tag: 'ok', value: successfulTask },
{ tag: 'err', error: { entity: failedTask, error: new Error('fail') } },
{
tag: 'err',
error: {
type: 'task',
id: failedToUpdateTask.id,
error: {
statusCode: 400,
error: 'fail',
message: 'fail',
},
},
},
])
);
mockTaskStore.fetch.mockResolvedValue({ docs: [successfulTask, failedTask] });
mockTaskStore.bulkGet.mockResolvedValue([asOk(successfulTask), asOk(failedToUpdateTask)]);
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const result = await taskScheduling.bulkEnable([successfulTask.id, failedTask.id]);
const result = await taskScheduling.bulkEnable([successfulTask.id, failedToUpdateTask.id]);
expect(result).toEqual({
tasks: [successfulTask],
errors: [{ task: failedTask, error: new Error('fail') }],
errors: [
{
type: 'task',
id: failedToUpdateTask.id,
error: {
statusCode: 400,
error: 'fail',
message: 'fail',
},
},
],
});
});
test('should not enable task if it is already enabled', async () => {
const task = mockTask({ id, enabled: true, schedule: { interval: '3h' } });
const task = taskManagerMock.createTask({ id, enabled: true, schedule: { interval: '3h' } });
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
mockTaskStore.bulkGet.mockResolvedValue([asOk(task)]);
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkEnable([id]);
@ -235,7 +232,7 @@ describe('TaskScheduling', () => {
});
test('should set runAt and scheduledAt if runSoon is true', async () => {
const task = mockTask({
const task = taskManagerMock.createTask({
id,
enabled: false,
schedule: { interval: '3h' },
@ -245,7 +242,7 @@ describe('TaskScheduling', () => {
mockTaskStore.bulkUpdate.mockImplementation(() =>
Promise.resolve([{ tag: 'ok', value: task }])
);
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
mockTaskStore.bulkGet.mockResolvedValue([asOk(task)]);
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkEnable([id]);
@ -263,7 +260,7 @@ describe('TaskScheduling', () => {
});
test('should not set runAt and scheduledAt if runSoon is false', async () => {
const task = mockTask({
const task = taskManagerMock.createTask({
id,
enabled: false,
schedule: { interval: '3h' },
@ -273,7 +270,7 @@ describe('TaskScheduling', () => {
mockTaskStore.bulkUpdate.mockImplementation(() =>
Promise.resolve([{ tag: 'ok', value: task }])
);
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
mockTaskStore.bulkGet.mockResolvedValue([asOk(task)]);
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkEnable([id], false);
@ -293,75 +290,72 @@ describe('TaskScheduling', () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
beforeEach(() => {
mockTaskStore.bulkUpdate.mockImplementation(() =>
Promise.resolve([{ tag: 'ok', value: mockTask() }])
Promise.resolve([{ tag: 'ok', value: taskManagerMock.createTask() }])
);
});
test('should search for tasks by ids enabled = true when disabling', async () => {
mockTaskStore.fetch.mockResolvedValue({ docs: [] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkDisable([id]);
expect(mockTaskStore.fetch).toHaveBeenCalledTimes(1);
expect(mockTaskStore.fetch).toHaveBeenCalledWith({
query: {
bool: {
must: [
{
terms: {
_id: [`task:${id}`],
},
},
{
term: {
'task.enabled': true,
},
},
],
},
},
size: 100,
});
});
test('should split search on chunks when input ids array too large', async () => {
mockTaskStore.fetch.mockResolvedValue({ docs: [] });
mockTaskStore.bulkGet.mockResolvedValue([]);
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkDisable(Array.from({ length: 1250 }));
expect(mockTaskStore.fetch).toHaveBeenCalledTimes(13);
expect(mockTaskStore.bulkGet).toHaveBeenCalledTimes(13);
});
test('should transform response into correct format', async () => {
const successfulTask = mockTask({
const successfulTask = taskManagerMock.createTask({
id: 'task-1',
enabled: false,
schedule: { interval: '1h' },
});
const failedTask = mockTask({ id: 'task-2', enabled: true, schedule: { interval: '1h' } });
const failedToUpdateTask = taskManagerMock.createTask({
id: 'task-2',
enabled: true,
schedule: { interval: '1h' },
});
mockTaskStore.bulkUpdate.mockImplementation(() =>
Promise.resolve([
{ tag: 'ok', value: successfulTask },
{ tag: 'err', error: { entity: failedTask, error: new Error('fail') } },
{
tag: 'err',
error: {
type: 'task',
id: failedToUpdateTask.id,
error: {
statusCode: 400,
error: 'fail',
message: 'fail',
},
},
},
])
);
mockTaskStore.fetch.mockResolvedValue({ docs: [successfulTask, failedTask] });
mockTaskStore.bulkGet.mockResolvedValue([asOk(successfulTask), asOk(failedToUpdateTask)]);
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const result = await taskScheduling.bulkDisable([successfulTask.id, failedTask.id]);
const result = await taskScheduling.bulkDisable([successfulTask.id, failedToUpdateTask.id]);
expect(result).toEqual({
tasks: [successfulTask],
errors: [{ task: failedTask, error: new Error('fail') }],
errors: [
{
type: 'task',
id: failedToUpdateTask.id,
error: {
statusCode: 400,
error: 'fail',
message: 'fail',
},
},
],
});
});
test('should not disable task if it is already disabled', async () => {
const task = mockTask({ id, enabled: false, schedule: { interval: '3h' } });
const task = taskManagerMock.createTask({ id, enabled: false, schedule: { interval: '3h' } });
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
mockTaskStore.bulkGet.mockResolvedValue([asOk(task)]);
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkDisable([id]);
@ -376,69 +370,73 @@ describe('TaskScheduling', () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
beforeEach(() => {
mockTaskStore.bulkUpdate.mockImplementation(() =>
Promise.resolve([{ tag: 'ok', value: mockTask() }])
Promise.resolve([{ tag: 'ok', value: taskManagerMock.createTask() }])
);
});
test('should search for tasks by ids and idle status', async () => {
mockTaskStore.fetch.mockResolvedValue({ docs: [] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkUpdateSchedules([id], { interval: '1h' });
expect(mockTaskStore.fetch).toHaveBeenCalledTimes(1);
expect(mockTaskStore.fetch).toHaveBeenCalledWith({
query: mustBeAllOf(
{
terms: {
_id: [`task:${id}`],
},
},
{
term: {
'task.status': 'idle',
},
}
),
size: 100,
});
});
test('should split search on chunks when input ids array too large', async () => {
mockTaskStore.fetch.mockResolvedValue({ docs: [] });
mockTaskStore.bulkGet.mockResolvedValue([]);
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkUpdateSchedules(Array.from({ length: 1250 }), { interval: '1h' });
expect(mockTaskStore.fetch).toHaveBeenCalledTimes(13);
expect(mockTaskStore.bulkGet).toHaveBeenCalledTimes(13);
});
test('should transform response into correct format', async () => {
const successfulTask = mockTask({ id: 'task-1', schedule: { interval: '1h' } });
const failedTask = mockTask({ id: 'task-2', schedule: { interval: '1h' } });
const successfulTask = taskManagerMock.createTask({
id: 'task-1',
schedule: { interval: '1h' },
});
const failedToUpdateTask = taskManagerMock.createTask({
id: 'task-2',
schedule: { interval: '1h' },
});
mockTaskStore.bulkUpdate.mockImplementation(() =>
Promise.resolve([
{ tag: 'ok', value: successfulTask },
{ tag: 'err', error: { entity: failedTask, error: new Error('fail') } },
{
tag: 'err',
error: {
type: 'task',
id: failedToUpdateTask.id,
error: {
statusCode: 400,
error: 'fail',
message: 'fail',
},
},
},
])
);
mockTaskStore.fetch.mockResolvedValue({ docs: [successfulTask, failedTask] });
mockTaskStore.bulkGet.mockResolvedValue([asOk(successfulTask), asOk(failedToUpdateTask)]);
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const result = await taskScheduling.bulkUpdateSchedules([successfulTask.id, failedTask.id], {
interval: '1h',
});
const result = await taskScheduling.bulkUpdateSchedules(
[successfulTask.id, failedToUpdateTask.id],
{ interval: '1h' }
);
expect(result).toEqual({
tasks: [successfulTask],
errors: [{ task: failedTask, error: new Error('fail') }],
errors: [
{
type: 'task',
id: failedToUpdateTask.id,
error: {
statusCode: 400,
error: 'fail',
message: 'fail',
},
},
],
});
});
test('should not update task if new interval is equal to previous', async () => {
const task = mockTask({ id, schedule: { interval: '3h' } });
const task = taskManagerMock.createTask({ id, schedule: { interval: '3h' } });
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
mockTaskStore.bulkGet.mockResolvedValue([asOk(task)]);
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkUpdateSchedules([id], { interval: '3h' });
@ -451,9 +449,13 @@ describe('TaskScheduling', () => {
test('should postpone task run if new interval is greater than previous', async () => {
// task set to be run in 2 hrs from now
const runInTwoHrs = new Date(Date.now() + moment.duration(2, 'hours').asMilliseconds());
const task = mockTask({ id, schedule: { interval: '3h' }, runAt: runInTwoHrs });
const task = taskManagerMock.createTask({
id,
schedule: { interval: '3h' },
runAt: runInTwoHrs,
});
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
mockTaskStore.bulkGet.mockResolvedValue([asOk(task)]);
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkUpdateSchedules([id], { interval: '5h' });
@ -471,9 +473,13 @@ describe('TaskScheduling', () => {
test('should set task run sooner if new interval is lesser than previous', async () => {
// task set to be run in one 2hrs from now
const runInTwoHrs = new Date(Date.now() + moment.duration(2, 'hours').asMilliseconds());
const task = mockTask({ id, schedule: { interval: '3h' }, runAt: runInTwoHrs });
const task = taskManagerMock.createTask({
id,
schedule: { interval: '3h' },
runAt: runInTwoHrs,
});
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
mockTaskStore.bulkGet.mockResolvedValue([asOk(task)]);
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkUpdateSchedules([id], { interval: '2h' });
@ -490,9 +496,13 @@ describe('TaskScheduling', () => {
test('should set task run to now if time that passed from last run is greater than new interval', async () => {
// task set to be run in one 1hr from now. With interval of '2h', it means last run happened 1 hour ago
const runInOneHr = new Date(Date.now() + moment.duration(1, 'hour').asMilliseconds());
const task = mockTask({ id, schedule: { interval: '2h' }, runAt: runInOneHr });
const task = taskManagerMock.createTask({
id,
schedule: { interval: '2h' },
runAt: runInOneHr,
});
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
mockTaskStore.bulkGet.mockResolvedValue([asOk(task)]);
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkUpdateSchedules([id], { interval: '30m' });
@ -511,13 +521,15 @@ describe('TaskScheduling', () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
mockTaskStore.get.mockResolvedValueOnce(mockTask({ id, status: TaskStatus.Idle }));
mockTaskStore.update.mockResolvedValueOnce(mockTask({ id }));
mockTaskStore.get.mockResolvedValueOnce(
taskManagerMock.createTask({ id, status: TaskStatus.Idle })
);
mockTaskStore.update.mockResolvedValueOnce(taskManagerMock.createTask({ id }));
const result = await taskScheduling.runSoon(id);
expect(mockTaskStore.update).toHaveBeenCalledWith(
mockTask({
taskManagerMock.createTask({
id,
status: TaskStatus.Idle,
runAt: expect.any(Date),
@ -532,12 +544,14 @@ describe('TaskScheduling', () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
mockTaskStore.get.mockResolvedValueOnce(mockTask({ id, status: TaskStatus.Failed }));
mockTaskStore.update.mockResolvedValueOnce(mockTask({ id }));
mockTaskStore.get.mockResolvedValueOnce(
taskManagerMock.createTask({ id, status: TaskStatus.Failed })
);
mockTaskStore.update.mockResolvedValueOnce(taskManagerMock.createTask({ id }));
const result = await taskScheduling.runSoon(id);
expect(mockTaskStore.update).toHaveBeenCalledWith(
mockTask({
taskManagerMock.createTask({
id,
status: TaskStatus.Idle,
runAt: expect.any(Date),
@ -552,7 +566,9 @@ describe('TaskScheduling', () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
mockTaskStore.get.mockResolvedValueOnce(mockTask({ id, status: TaskStatus.Idle }));
mockTaskStore.get.mockResolvedValueOnce(
taskManagerMock.createTask({ id, status: TaskStatus.Idle })
);
mockTaskStore.update.mockRejectedValueOnce(500);
const result = taskScheduling.runSoon(id);
@ -566,7 +582,9 @@ describe('TaskScheduling', () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
mockTaskStore.get.mockResolvedValueOnce(mockTask({ id, status: TaskStatus.Idle }));
mockTaskStore.get.mockResolvedValueOnce(
taskManagerMock.createTask({ id, status: TaskStatus.Idle })
);
mockTaskStore.update.mockRejectedValueOnce({ statusCode: 409 });
const result = await taskScheduling.runSoon(id);
@ -580,7 +598,9 @@ describe('TaskScheduling', () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
mockTaskStore.get.mockResolvedValueOnce(mockTask({ id, status: TaskStatus.Claiming }));
mockTaskStore.get.mockResolvedValueOnce(
taskManagerMock.createTask({ id, status: TaskStatus.Claiming })
);
mockTaskStore.update.mockRejectedValueOnce(409);
const result = taskScheduling.runSoon(id);
@ -595,7 +615,9 @@ describe('TaskScheduling', () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
mockTaskStore.get.mockResolvedValueOnce(mockTask({ id, status: TaskStatus.Running }));
mockTaskStore.get.mockResolvedValueOnce(
taskManagerMock.createTask({ id, status: TaskStatus.Running })
);
mockTaskStore.update.mockRejectedValueOnce(409);
const result = taskScheduling.runSoon(id);
@ -610,7 +632,9 @@ describe('TaskScheduling', () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
mockTaskStore.get.mockResolvedValueOnce(mockTask({ id, status: TaskStatus.Unrecognized }));
mockTaskStore.get.mockResolvedValueOnce(
taskManagerMock.createTask({ id, status: TaskStatus.Unrecognized })
);
mockTaskStore.update.mockRejectedValueOnce(409);
const result = taskScheduling.runSoon(id);
@ -633,7 +657,7 @@ describe('TaskScheduling', () => {
describe('ephemeralRunNow', () => {
test('runs a task ephemerally', async () => {
const ephemeralEvents$ = new Subject<TaskLifecycleEvent>();
const ephemeralTask = mockTask({
const ephemeralTask = taskManagerMock.createTask({
state: {
foo: 'bar',
},
@ -679,7 +703,7 @@ describe('TaskScheduling', () => {
test('rejects ephemeral task if lifecycle returns an error', async () => {
const ephemeralEvents$ = new Subject<TaskLifecycleEvent>();
const ephemeralTask = mockTask({
const ephemeralTask = taskManagerMock.createTask({
state: {
foo: 'bar',
},
@ -723,7 +747,7 @@ describe('TaskScheduling', () => {
});
test('rejects ephemeral task if ephemeralTaskLifecycle is not defined', async () => {
const ephemeralTask = mockTask({
const ephemeralTask = taskManagerMock.createTask({
state: {
foo: 'bar',
},
@ -820,25 +844,3 @@ describe('TaskScheduling', () => {
});
});
});
function mockTask(overrides: Partial<ConcreteTaskInstance> = {}): ConcreteTaskInstance {
return {
id: 'claimed-by-id',
runAt: new Date(),
taskType: 'foo',
schedule: undefined,
attempts: 0,
enabled: true,
status: TaskStatus.Claiming,
params: { hello: 'world' },
state: { baby: 'Henhen' },
user: 'jimbo',
scope: ['reporting'],
ownerId: '',
startedAt: null,
retryAt: null,
scheduledAt: new Date(),
traceparent: 'taskTraceparent',
...overrides,
};
}

View file

@ -7,14 +7,13 @@
import { filter, take } from 'rxjs/operators';
import pMap from 'p-map';
import { SavedObjectError } from '@kbn/core-saved-objects-common';
import { v4 as uuidv4 } from 'uuid';
import { chunk, pick } from 'lodash';
import { chunk, flatten, pick } from 'lodash';
import { Subject } from 'rxjs';
import agent from 'elastic-apm-node';
import { Logger } from '@kbn/core/server';
import { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { mustBeAllOf } from './queries/query_clauses';
import { either, isErr, mapErr } from './lib/result_type';
import {
ErroredTask,
@ -40,6 +39,7 @@ import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fie
import { TaskLifecycleEvent } from './polling_lifecycle';
import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle';
import { EphemeralTaskRejectedDueToCapacityError } from './task_running';
import { retryableBulkUpdate } from './lib/retryable_bulk_update';
const VERSION_CONFLICT_STATUS = 409;
const BULK_ACTION_SIZE = 100;
@ -63,7 +63,7 @@ export interface BulkUpdateTaskResult {
/**
* list of failed tasks and errors caused failure
*/
errors: Array<{ task: ConcreteTaskInstance; error: Error }>;
errors: Array<{ type: string; id: string; error: SavedObjectError }>;
}
export interface RunSoonResult {
id: ConcreteTaskInstance['id'];
@ -153,52 +153,28 @@ export class TaskScheduling {
}
public async bulkDisable(taskIds: string[]) {
const enabledTasks = await this.bulkGetTasksHelper(taskIds, {
term: {
'task.enabled': true,
},
return await retryableBulkUpdate({
taskIds,
store: this.store,
getTasks: async (ids) => await this.bulkGetTasksHelper(ids),
filter: (task) => !!task.enabled,
map: (task) => ({ ...task, enabled: false }),
});
const updatedTasks = enabledTasks
.flatMap(({ docs }) => docs)
.reduce<ConcreteTaskInstance[]>((acc, task) => {
// if task is not enabled, no need to update it
if (!task.enabled) {
return acc;
}
acc.push({ ...task, enabled: false });
return acc;
}, []);
return await this.bulkUpdateTasksHelper(updatedTasks);
}
public async bulkEnable(taskIds: string[], runSoon: boolean = true) {
const disabledTasks = await this.bulkGetTasksHelper(taskIds, {
term: {
'task.enabled': false,
return await retryableBulkUpdate({
taskIds,
store: this.store,
getTasks: async (ids) => await this.bulkGetTasksHelper(ids),
filter: (task) => !task.enabled,
map: (task) => {
if (runSoon) {
return { ...task, enabled: true, scheduledAt: new Date(), runAt: new Date() };
}
return { ...task, enabled: true };
},
});
const updatedTasks = disabledTasks
.flatMap(({ docs }) => docs)
.reduce<ConcreteTaskInstance[]>((acc, task) => {
// if task is enabled, no need to update it
if (task.enabled) {
return acc;
}
if (runSoon) {
acc.push({ ...task, enabled: true, scheduledAt: new Date(), runAt: new Date() });
} else {
acc.push({ ...task, enabled: true });
}
return acc;
}, []);
return await this.bulkUpdateTasksHelper(updatedTasks);
}
/**
@ -214,20 +190,13 @@ export class TaskScheduling {
taskIds: string[],
schedule: IntervalSchedule
): Promise<BulkUpdateTaskResult> {
const tasks = await this.bulkGetTasksHelper(taskIds, {
term: {
'task.status': 'idle',
},
});
const updatedTasks = tasks
.flatMap(({ docs }) => docs)
.reduce<ConcreteTaskInstance[]>((acc, task) => {
// if task schedule interval is the same, no need to update it
if (task.schedule?.interval === schedule.interval) {
return acc;
}
return retryableBulkUpdate({
taskIds,
store: this.store,
getTasks: async (ids) => await this.bulkGetTasksHelper(ids),
filter: (task) =>
task.status === TaskStatus.Idle && task.schedule?.interval !== schedule.interval,
map: (task) => {
const oldIntervalInMs = parseIntervalAsMillisecond(task.schedule?.interval ?? '0s');
// computing new runAt using formula:
@ -237,45 +206,18 @@ export class TaskScheduling {
task.runAt.getTime() - oldIntervalInMs + parseIntervalAsMillisecond(schedule.interval)
);
acc.push({ ...task, schedule, runAt: new Date(newRunAtInMs) });
return acc;
}, []);
return await this.bulkUpdateTasksHelper(updatedTasks);
return { ...task, schedule, runAt: new Date(newRunAtInMs) };
},
});
}
private async bulkGetTasksHelper(taskIds: string[], ...must: QueryDslQueryContainer[]) {
return await pMap(
private async bulkGetTasksHelper(taskIds: string[]) {
const batches = await pMap(
chunk(taskIds, BULK_ACTION_SIZE),
async (taskIdsChunk) =>
this.store.fetch({
query: mustBeAllOf(
{
terms: {
_id: taskIdsChunk.map((taskId) => `task:${taskId}`),
},
},
...must
),
size: BULK_ACTION_SIZE,
}),
async (taskIdsChunk) => this.store.bulkGet(taskIdsChunk),
{ concurrency: 10 }
);
}
private async bulkUpdateTasksHelper(updatedTasks: ConcreteTaskInstance[]) {
return (await this.store.bulkUpdate(updatedTasks)).reduce<BulkUpdateTaskResult>(
(acc, task) => {
if (task.tag === 'ok') {
acc.tasks.push(task.value);
} else {
acc.errors.push({ error: task.error.error, task: task.error.entity });
}
return acc;
},
{ tasks: [], errors: [] }
);
return flatten(batches);
}
/**

View file

@ -26,6 +26,7 @@ export const taskStoreMock = {
fetch: jest.fn(),
aggregate: jest.fn(),
updateByQuery: jest.fn(),
bulkGet: jest.fn(),
index,
taskManagerId,
} as unknown as jest.Mocked<TaskStore>;

View file

@ -22,6 +22,7 @@ import { SavedObjectAttributes, SavedObjectsErrorHelpers } from '@kbn/core/serve
import { TaskTypeDictionary } from './task_type_dictionary';
import { mockLogger } from './test_utils';
import { AdHocTaskCounter } from './lib/adhoc_task_counter';
import { asErr } from './lib/result_type';
const savedObjectsClient = savedObjectsRepositoryMock.create();
const serializer = savedObjectsServiceMock.createSerializer();
@ -661,6 +662,70 @@ describe('TaskStore', () => {
});
});
describe('bulkGet', () => {
let store: TaskStore;
beforeAll(() => {
store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
esClient: elasticsearchServiceMock.createClusterClient().asInternalUser,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
adHocTaskCounter,
});
});
test('gets a task specified by id', async () => {
savedObjectsClient.bulkGet.mockResolvedValue({ saved_objects: [] });
await store.bulkGet(['1', '2']);
expect(savedObjectsClient.bulkGet).toHaveBeenCalledWith([
{ type: 'task', id: '1' },
{ type: 'task', id: '2' },
]);
});
test('returns error when task not found', async () => {
savedObjectsClient.bulkGet.mockResolvedValue({
saved_objects: [
{
type: 'task',
id: '1',
attributes: {},
references: [],
error: {
error: 'Oh no',
message: 'Oh no',
statusCode: 404,
},
},
],
});
const result = await store.bulkGet(['1']);
expect(result).toEqual([
asErr({
type: 'task',
id: '1',
error: {
error: 'Oh no',
message: 'Oh no',
statusCode: 404,
},
}),
]);
});
test('pushes error from saved objects client to errors$', async () => {
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
savedObjectsClient.bulkGet.mockRejectedValue(new Error('Failure'));
await expect(store.bulkGet([randomId()])).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failure"`
);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});
});
describe('getLifecycle', () => {
test('returns the task status if the task exists ', async () => {
expect.assertions(5);

View file

@ -10,6 +10,7 @@
*/
import { Subject } from 'rxjs';
import { omit, defaults, get } from 'lodash';
import { SavedObjectError } from '@kbn/core-saved-objects-common';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { SavedObjectsBulkDeleteResponse } from '@kbn/core/server';
@ -75,7 +76,11 @@ export interface FetchResult {
export type BulkUpdateResult = Result<
ConcreteTaskInstance,
{ entity: ConcreteTaskInstance; error: Error }
{ type: string; id: string; error: SavedObjectError }
>;
export type BulkGetResult = Array<
Result<ConcreteTaskInstance, { type: string; id: string; error: SavedObjectError }>
>;
export interface UpdateByQueryResult {
@ -252,14 +257,14 @@ export class TaskStore {
return attrsById;
}, new Map());
let updatedSavedObjects: Array<SavedObjectsUpdateResponse | Error>;
let updatedSavedObjects: Array<SavedObjectsUpdateResponse<SerializedConcreteTaskInstance>>;
try {
({ saved_objects: updatedSavedObjects } =
await this.savedObjectsRepository.bulkUpdate<SerializedConcreteTaskInstance>(
docs.map((doc) => ({
type: 'task',
id: doc.id,
options: { version: doc.version },
version: doc.version,
attributes: attributesByDocId.get(doc.id)!,
})),
{
@ -271,9 +276,14 @@ export class TaskStore {
throw e;
}
return updatedSavedObjects.map<BulkUpdateResult>((updatedSavedObject, index) =>
isSavedObjectsUpdateResponse(updatedSavedObject)
? asOk(
return updatedSavedObjects.map((updatedSavedObject) => {
return updatedSavedObject.error !== undefined
? asErr({
type: 'task',
id: updatedSavedObject.id,
error: updatedSavedObject.error,
})
: asOk(
savedObjectToConcreteTaskInstance({
...updatedSavedObject,
attributes: defaults(
@ -281,15 +291,8 @@ export class TaskStore {
attributesByDocId.get(updatedSavedObject.id)!
),
})
)
: asErr({
// The SavedObjectsRepository maintains the order of the docs
// so we can rely on the index in the `docs` to match an error
// on the same index in the `bulkUpdate` result
entity: docs[index],
error: updatedSavedObject,
})
);
);
});
}
/**
@ -340,6 +343,30 @@ export class TaskStore {
return savedObjectToConcreteTaskInstance(result);
}
/**
* Gets tasks by ids
*
* @param {Array<string>} ids
* @returns {Promise<ConcreteTaskInstance[]>}
*/
public async bulkGet(ids: string[]): Promise<BulkGetResult> {
let result;
try {
result = await this.savedObjectsRepository.bulkGet<SerializedConcreteTaskInstance>(
ids.map((id) => ({ type: 'task', id }))
);
} catch (e) {
this.errors$.next(e);
throw e;
}
return result.saved_objects.map((task) => {
if (task.error) {
return asErr({ id: task.id, type: task.type, error: task.error });
}
return asOk(savedObjectToConcreteTaskInstance(task));
});
}
/**
* Gets task lifecycle step by id
*
@ -535,9 +562,3 @@ function ensureAggregationOnlyReturnsTaskObjects(opts: AggregationOpts): Aggrega
query,
};
}
function isSavedObjectsUpdateResponse(
result: SavedObjectsUpdateResponse | Error
): result is SavedObjectsUpdateResponse {
return result && typeof (result as SavedObjectsUpdateResponse).id === 'string';
}

View file

@ -17,6 +17,7 @@
"@kbn/safer-lodash-set",
"@kbn/es-types",
"@kbn/apm-utils",
"@kbn/core-saved-objects-common",
],
"exclude": [
"target/**/*",

View file

@ -26,8 +26,7 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte
const supertest = getService('supertest');
const supertestWithoutAuth = getService('supertestWithoutAuth');
// Failing: See https://github.com/elastic/kibana/issues/141849
describe.skip('disable', () => {
describe('disable', () => {
const objectRemover = new ObjectRemover(supertest);
after(() => objectRemover.removeAll());

View file

@ -26,8 +26,7 @@ export default function createDisableRuleTests({ getService }: FtrProviderContex
const retry = getService('retry');
const supertest = getService('supertest');
// Failing: See https://github.com/elastic/kibana/issues/141864
describe.skip('disable', () => {
describe('disable', () => {
const objectRemover = new ObjectRemover(supertestWithoutAuth);
const ruleUtils = new RuleUtils({ space: Spaces.space1, supertestWithoutAuth });

View file

@ -26,6 +26,7 @@ export default function ({ getService }: FtrProviderContext) {
const security = getService('security');
const esDeleteAllIndices = getService('esDeleteAllIndices');
const config = getService('config');
const retry = getService('retry');
const log = getService('log');
const randomness = getService('randomness');
const testUser = { username: 'test_user', password: 'changeme' };
@ -134,19 +135,19 @@ export default function ({ getService }: FtrProviderContext) {
return parseCookie(authenticationResponse.headers['set-cookie'][0])!;
}
async function toggleSessionCleanupTask(enabled: boolean) {
await supertest
.post('/session/toggle_cleanup_task')
.set('kbn-xsrf', 'xxx')
.auth(adminTestUser.username, adminTestUser.password)
.send({ enabled })
.expect(200);
async function runCleanupTaskSoon() {
// In most cases, an error would mean the task is currently running so let's run it again
await retry.tryForTime(30000, async () => {
await supertest
.post('/session/_run_cleanup')
.set('kbn-xsrf', 'xxx')
.auth(adminTestUser.username, adminTestUser.password)
.send()
.expect(200);
});
}
// FLAKY: https://github.com/elastic/kibana/issues/149092
// FLAKY: https://github.com/elastic/kibana/issues/149091
// FLAKY: https://github.com/elastic/kibana/issues/149090
describe.skip('Session Concurrent Limit cleanup', () => {
describe('Session Concurrent Limit cleanup', () => {
before(async () => {
await security.user.create('anonymous_user', {
password: 'changeme',
@ -161,7 +162,6 @@ export default function ({ getService }: FtrProviderContext) {
beforeEach(async function () {
this.timeout(120000);
await toggleSessionCleanupTask(false);
await es.cluster.health({ index: '.kibana_security_session*', wait_for_status: 'green' });
await esDeleteAllIndices('.kibana_security_session*');
});
@ -179,13 +179,13 @@ export default function ({ getService }: FtrProviderContext) {
expect(await getNumberOfSessionDocuments()).to.be(3);
// Let's wait for 60s to make sure cleanup routine runs after it was enabled.
// Poke the background task to run
await runCleanupTaskSoon();
log.debug('Waiting for cleanup job to run...');
await toggleSessionCleanupTask(true);
await setTimeoutAsync(60000);
// The oldest session should have been removed, but the rest should still be valid.
expect(await getNumberOfSessionDocuments()).to.be(2);
await retry.tryForTime(30000, async () => {
// The oldest session should have been removed, but the rest should still be valid.
expect(await getNumberOfSessionDocuments()).to.be(2);
});
await checkSessionCookieInvalid(basicSessionCookieOne);
await checkSessionCookie(basicSessionCookieTwo, testUser.username, basicProvider);
@ -208,13 +208,13 @@ export default function ({ getService }: FtrProviderContext) {
expect(await getNumberOfSessionDocuments()).to.be(6);
// Let's wait for 60s to make sure cleanup routine runs after it was enabled.
// Poke the background task to run
await runCleanupTaskSoon();
log.debug('Waiting for cleanup job to run...');
await toggleSessionCleanupTask(true);
await setTimeoutAsync(60000);
// The oldest session should have been removed, but the rest should still be valid.
expect(await getNumberOfSessionDocuments()).to.be(4);
await retry.tryForTime(30000, async () => {
// The oldest session should have been removed, but the rest should still be valid.
expect(await getNumberOfSessionDocuments()).to.be(4);
});
await checkSessionCookieInvalid(basicSessionCookieOne);
await checkSessionCookie(basicSessionCookieTwo, testUser.username, basicProvider);
@ -275,13 +275,13 @@ export default function ({ getService }: FtrProviderContext) {
refresh: true,
});
// Let's wait for 60s to make sure cleanup routine runs after it was enabled.
// Poke the background task to run
await runCleanupTaskSoon();
log.debug('Waiting for cleanup job to run...');
await toggleSessionCleanupTask(true);
await setTimeoutAsync(60000);
// The oldest session should have been removed, but the rest should still be valid.
expect(await getNumberOfSessionDocuments()).to.be(4);
await retry.tryForTime(30000, async () => {
// The oldest session should have been removed, but the rest should still be valid.
expect(await getNumberOfSessionDocuments()).to.be(4);
});
await checkSessionCookie(basicSessionCookieOne, testUser.username, basicProvider);
await checkSessionCookie(basicSessionCookieTwo, testUser.username, basicProvider);
@ -303,13 +303,13 @@ export default function ({ getService }: FtrProviderContext) {
expect(await getNumberOfSessionDocuments()).to.be(2);
// Let's wait for 60s to make sure cleanup routine runs after it was enabled.
// Poke the background task to run
await runCleanupTaskSoon();
log.debug('Waiting for cleanup job to run...');
await toggleSessionCleanupTask(true);
await setTimeoutAsync(60000);
// The oldest session should have been removed, but the rest should still be valid.
expect(await getNumberOfSessionDocuments()).to.be(2);
await retry.tryForTime(30000, async () => {
// The oldest session should have been removed, but the rest should still be valid.
expect(await getNumberOfSessionDocuments()).to.be(2);
});
await checkSessionCookie(basicSessionCookieOne, testUser.username, basicProvider);
await checkSessionCookie(basicSessionCookieTwo, testUser.username, basicProvider);
@ -326,13 +326,13 @@ export default function ({ getService }: FtrProviderContext) {
expect(await getNumberOfSessionDocuments()).to.be(3);
// Let's wait for 60s to make sure cleanup routine runs after it was enabled.
// Poke the background task to run
await runCleanupTaskSoon();
log.debug('Waiting for cleanup job to run...');
await toggleSessionCleanupTask(true);
await setTimeoutAsync(60000);
// The oldest session should have been removed, but the rest should still be valid.
expect(await getNumberOfSessionDocuments()).to.be(3);
await retry.tryForTime(30000, async () => {
// The oldest session should have been removed, but the rest should still be valid.
expect(await getNumberOfSessionDocuments()).to.be(3);
});
// All sessions should be active.
for (const anonymousSessionCookie of [
@ -355,13 +355,13 @@ export default function ({ getService }: FtrProviderContext) {
expect(await getNumberOfSessionDocuments()).to.be(3);
// Let's wait for 60s to make sure cleanup routine runs after it was enabled.
// Poke the background task to run
await runCleanupTaskSoon();
log.debug('Waiting for cleanup job to run...');
await toggleSessionCleanupTask(true);
await setTimeoutAsync(60000);
// The oldest session should have been removed, but the rest should still be valid.
expect(await getNumberOfSessionDocuments()).to.be(3);
await retry.tryForTime(30000, async () => {
// The oldest session should have been removed, but the rest should still be valid.
expect(await getNumberOfSessionDocuments()).to.be(3);
});
// Finish SAML handshake (all should succeed since we don't enforce limit at session creation time).
const samlSessionCookieOne = await finishSAMLHandshake(

View file

@ -177,6 +177,18 @@ export function initRoutes(
}
}
router.post(
{
path: '/session/_run_cleanup',
validate: false,
},
async (context, request, response) => {
const [, { taskManager }] = await core.getStartServices();
await taskManager.runSoon(SESSION_INDEX_CLEANUP_TASK_NAME);
return response.ok();
}
);
router.post(
{
path: '/session/toggle_cleanup_task',