Replace TaskManager's runNow with runSoon (#134324)

* Replace TaskManager's runNow with runSoon
This commit is contained in:
Ersin Erdal 2022-06-28 15:58:15 +02:00 committed by GitHub
parent 35265fcd10
commit a52957760a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 312 additions and 582 deletions

View file

@ -1398,7 +1398,7 @@
"signature": [
"Pick<",
"TaskScheduling",
", \"schedule\" | \"runNow\" | \"ephemeralRunNow\" | \"ensureScheduled\" | \"bulkUpdateSchedules\"> & Pick<",
", \"schedule\" | \"runSoon\" | \"ephemeralRunNow\" | \"ensureScheduled\" | \"bulkUpdateSchedules\"> & Pick<",
"TaskStore",
", \"fetch\" | \"get\" | \"remove\"> & { removeIfExists: (id: string) => Promise<void>; } & { supportsEphemeralTasks: () => boolean; }"
],
@ -1416,4 +1416,4 @@
"misc": [],
"objects": []
}
}
}

View file

@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { DashboardPlugin } from './plugin';
import { coreMock } from '@kbn/core/server/mocks';
import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks';
import { scheduleDashboardTelemetry, TASK_ID } from './usage/dashboard_telemetry_collection_task';
jest.mock('./usage/dashboard_telemetry_collection_task', () => ({
scheduleDashboardTelemetry: jest.fn().mockResolvedValue('ok'),
TASK_ID: 'mockTaskID',
}));
describe('DashboardPlugin', () => {
describe('start', () => {
let mockCoreStart: ReturnType<typeof coreMock.createStart>;
let initContext: ReturnType<typeof coreMock.createPluginInitializerContext>;
let mockTaskManager: ReturnType<typeof taskManagerMock.createStart>;
beforeEach(() => {
mockCoreStart = coreMock.createStart();
mockTaskManager = taskManagerMock.createStart();
initContext = coreMock.createPluginInitializerContext();
});
afterEach(() => {
jest.clearAllMocks();
});
test('should call mockTaskManager.runSoon', async () => {
const dashboardPlugin = new DashboardPlugin(initContext);
dashboardPlugin.start(mockCoreStart, {
taskManager: mockTaskManager,
});
expect(scheduleDashboardTelemetry).toHaveBeenCalledTimes(1);
expect(await mockTaskManager.runSoon).toHaveBeenCalledTimes(1);
expect(await mockTaskManager.runSoon).toHaveBeenCalledWith(TASK_ID);
});
test('error from runSoon is handled gracefully', async () => {
const dashboardPlugin = new DashboardPlugin(initContext);
mockTaskManager.runSoon.mockRejectedValueOnce(500);
const response = dashboardPlugin.start(mockCoreStart, {
taskManager: mockTaskManager,
});
expect(scheduleDashboardTelemetry).toHaveBeenCalledTimes(1);
expect(await mockTaskManager.runSoon).toHaveBeenCalledTimes(1);
expect(response).toEqual({});
});
});
});

View file

@ -83,9 +83,13 @@ export class DashboardPlugin
this.logger.debug('dashboard: Started');
if (plugins.taskManager) {
scheduleDashboardTelemetry(this.logger, plugins.taskManager);
plugins.taskManager.runNow(TASK_ID);
scheduleDashboardTelemetry(this.logger, plugins.taskManager)
.then(async () => {
await plugins.taskManager.runSoon(TASK_ID);
})
.catch((e) => {
this.logger.debug(`Error scheduling task, received ${e.message}`);
});
}
return {};

View file

@ -46,10 +46,8 @@ export function initializeDashboardTelemetryTask(
registerDashboardTelemetryTask(logger, core, taskManager, embeddable);
}
export function scheduleDashboardTelemetry(logger: Logger, taskManager?: TaskManagerStartContract) {
if (taskManager) {
scheduleTasks(logger, taskManager);
}
export function scheduleDashboardTelemetry(logger: Logger, taskManager: TaskManagerStartContract) {
return scheduleTasks(logger, taskManager);
}
function registerDashboardTelemetryTask(
@ -69,7 +67,7 @@ function registerDashboardTelemetryTask(
async function scheduleTasks(logger: Logger, taskManager: TaskManagerStartContract) {
try {
await taskManager.ensureScheduled({
return await taskManager.ensureScheduled({
id: TASK_ID,
taskType: TELEMETRY_TASK_TYPE,
state: { byDate: {}, suggestionsByDate: {}, saved: {}, runs: 0 },

View file

@ -50,7 +50,7 @@ export function getBeforeSetup(
jest.resetAllMocks();
rulesClientParams.createAPIKey.mockResolvedValue({ apiKeysEnabled: false });
rulesClientParams.getUserName.mockResolvedValue('elastic');
taskManager.runNow.mockResolvedValue({ id: '' });
taskManager.runSoon.mockResolvedValue({ id: '' });
const actionsClient = actionsClientMock.create();
actionsClient.getBulk.mockResolvedValueOnce([

View file

@ -1581,7 +1581,7 @@ describe('update()', () => {
],
});
taskManager.runNow.mockReturnValueOnce(Promise.resolve({ id: alertId }));
taskManager.runSoon.mockReturnValueOnce(Promise.resolve({ id: alertId }));
}
test('updating the alert schedule should call taskManager.bulkUpdateSchedules', async () => {

View file

@ -310,7 +310,7 @@ beforeEach(() => {
rulesClientParams.createAPIKey.mockResolvedValue({ apiKeysEnabled: false });
rulesClientParams.getUserName.mockResolvedValue('elastic');
taskManager.runNow.mockResolvedValue({ id: '' });
taskManager.runSoon.mockResolvedValue({ id: '' });
taskManager.schedule.mockResolvedValue({
id: 'scheduled-task-id',
scheduledAt: new Date(),

View file

@ -159,7 +159,7 @@ export async function createApmTelemetry({
logger.debug(
`Stored telemetry is out of date. Task will run immediately. Stored: ${currentData.kibanaVersion}, expected: ${kibanaVersion}`
);
await taskManagerStart.runNow(APM_TELEMETRY_TASK_NAME);
await taskManagerStart.runSoon(APM_TELEMETRY_TASK_NAME);
}
} catch (err) {
if (!SavedObjectsErrorHelpers.isNotFoundError(err)) {

View file

@ -23,7 +23,7 @@ export const debugTelemetryRoute = createApmServerRoute({
const taskManagerStart = await plugins.taskManager?.start();
const savedObjectsClient = coreContext.savedObjects.client;
await taskManagerStart?.runNow?.(APM_TELEMETRY_TASK_NAME);
await taskManagerStart?.runSoon?.(APM_TELEMETRY_TASK_NAME);
const apmTelemetryObject = await savedObjectsClient.get(
APM_TELEMETRY_SAVED_OBJECT_TYPE,

View file

@ -325,7 +325,7 @@ The _Start_ Plugin api allow you to use Task Manager to facilitate your Plugin's
schedule: (taskInstance: TaskInstanceWithDeprecatedFields, options?: any) => {
// ...
},
runNow: (taskId: string) => {
runSoon: (taskId: string) => {
// ...
},
bulkUpdateSchedules: (taskIds: string[], schedule: IntervalSchedule) => {
@ -394,8 +394,8 @@ The danger is that in such a situation, a Task with that same `id` might already
To achieve this you should use the `ensureScheduling` api which has the exact same behavior as `schedule`, except it allows the scheduling of a Task with an `id` that's already in assigned to another Task and it will assume that the existing Task is the one you wished to `schedule`, treating this as a successful operation.
#### runNow
Using `runNow` you can instruct TaskManger to run an existing task on-demand, without waiting for its scheduled time to be reached.
#### runSoon
Using `runSoon` you can instruct TaskManager to run an existing task as soon as possible by updating the next scheduled run date to be now
```js
export class Plugin {
@ -407,7 +407,7 @@ export class Plugin {
public start(core: CoreStart, plugins: { taskManager }) {
try {
const taskRunResult = await taskManager.runNow('91760f10-ba42-de9799');
const taskRunResult = await taskManager.runSoon('91760f10-ba42-de9799');
// If no error is thrown, the task has completed successfully.
} catch(err: Error) {
// If running the task has failed, we throw an error with an appropriate message.
@ -506,20 +506,20 @@ Task Manager's _push_ mechanism is driven by the following operations:
1. A polling interval has been reached.
2. A new Task is scheduled.
3. A Task is run using `runNow`.
The polling interval is straight forward: TaskPoller is configured to emit an event at a fixed interval.
That said, if there are no workers available, we want to ignore these events, so we'll throttle the interval on worker availability.
Whenever a user uses the `schedule` api to schedule a new Task, we want to trigger an early polling in order to respond to the newly scheduled task as soon as possible, but this too we only wish to do if there are available workers, so we can throttle this too.
When a `runNow` call is made we need to force a poll as the user will now be waiting on the result of the `runNow` call, but
there is a complexity here- we don't want to force polling (as there might not be any worker capacity and it's possible that a polling cycle is already running), but we also can't throttle, as we can't afford to "drop" these requests, so we'll have to buffer these.
However, besides above operations `runSoon` can be used to run a task.
`runSoon` updates a tasks `runAt` and `scheduledAt` properties with current date-time stamp.
So the task would be picked up at the next TaskManager polling cycle by one of the Kibana instances that has capacity.
We now want to respond to all three of these push events, but we still need to balance against our worker capacity, so if there are too many requests buffered, we only want to `take` as many requests as we have capacity to handle.
Luckily, `Polling Interval` and `Task Scheduled` simply denote a request to "poll for work as soon as possible", unlike `Run Task Now` which also means "poll for these specific tasks", so our worker capacity only needs to be applied to `Run Task Now`.
We now want to respond to all three of these events, but we still need to balance against our worker capacity, so if there are too many requests buffered, we only want to `take` as many requests as we have capacity to handle.
Luckily, `Polling Interval` and `Task Scheduled` simply denote a request to "poll for work as soon as possible", and `Run Task Soon` simply adds the task to the current buffer.
We achieve this model by buffering requests into a queue using a Set (which removes duplicated). As we don't want an unbounded queue in our system, we have limited the size of this queue (configurable by the `xpack.task_manager.request_capacity` config, defaulting to 1,000 requests) which forces us to throw an error once this cap is reachedand to all subsequent calls to `runNow` until the queue drain bellow the cap.
We achieve this model by buffering requests into a queue using a Set (which removes duplicated). As we don't want an unbounded queue in our system, we have limited the size of this queue (configurable by the `xpack.task_manager.request_capacity` config, defaulting to 1,000 requests) which forces us to throw an error once this cap is reached until the queue drain bellow the cap.
Our current model, then, is this:
```

View file

@ -22,7 +22,7 @@ const createStartMock = () => {
get: jest.fn(),
remove: jest.fn(),
schedule: jest.fn(),
runNow: jest.fn(),
runSoon: jest.fn(),
ephemeralRunNow: jest.fn(),
ensureScheduled: jest.fn(),
removeIfExists: jest.fn(),

View file

@ -48,7 +48,7 @@ export interface TaskManagerSetupContract {
export type TaskManagerStartContract = Pick<
TaskScheduling,
'schedule' | 'runNow' | 'ephemeralRunNow' | 'ensureScheduled' | 'bulkUpdateSchedules'
'schedule' | 'runSoon' | 'ephemeralRunNow' | 'ensureScheduled' | 'bulkUpdateSchedules'
> &
Pick<TaskStore, 'fetch' | 'get' | 'remove'> & {
removeIfExists: TaskStore['remove'];
@ -237,7 +237,7 @@ export class TaskManagerPlugin
removeIfExists: (id: string) => removeIfExists(taskStore, id),
schedule: (...args) => taskScheduling.schedule(...args),
ensureScheduled: (...args) => taskScheduling.ensureScheduled(...args),
runNow: (...args) => taskScheduling.runNow(...args),
runSoon: (...args) => taskScheduling.runSoon(...args),
bulkUpdateSchedules: (...args) => taskScheduling.bulkUpdateSchedules(...args),
ephemeralRunNow: (task: EphemeralTask) => taskScheduling.ephemeralRunNow(task),
supportsEphemeralTasks: () => this.config.ephemeral_tasks.enabled,

View file

@ -11,7 +11,7 @@ const createTaskSchedulingMock = () => {
return {
ensureScheduled: jest.fn(),
schedule: jest.fn(),
runNow: jest.fn(),
runSoon: jest.fn(),
ephemeralRunNow: jest.fn(),
} as unknown as jest.Mocked<TaskScheduling>;
};

View file

@ -6,22 +6,14 @@
*/
import { Subject } from 'rxjs';
import { none, some } from 'fp-ts/lib/Option';
import moment from 'moment';
import {
asTaskMarkRunningEvent,
asTaskRunEvent,
asTaskClaimEvent,
asTaskRunRequestEvent,
TaskClaimErrorType,
TaskPersistence,
} from './task_events';
import { asTaskRunEvent, TaskPersistence } from './task_events';
import { TaskLifecycleEvent } from './polling_lifecycle';
import { taskPollingLifecycleMock } from './polling_lifecycle.mock';
import { TaskScheduling } from './task_scheduling';
import { asErr, asOk } from './lib/result_type';
import { ConcreteTaskInstance, TaskLifecycleResult, TaskStatus } from './task';
import { ConcreteTaskInstance, TaskStatus } from './task';
import { createInitialMiddleware } from './lib/middleware';
import { taskStoreMock } from './task_store.mock';
import { TaskRunResult } from './task_running';
@ -52,7 +44,7 @@ describe('TaskScheduling', () => {
middleware: createInitialMiddleware(),
definitions,
ephemeralTaskLifecycle: ephemeralTaskLifecycleMock.create({}),
taskManagerId: '',
taskManagerId: '123',
};
definitions.registerTaskDefinitions({
@ -269,308 +261,131 @@ describe('TaskScheduling', () => {
expect(bulkUpdatePayload[0].runAt.getTime()).toBeLessThanOrEqual(Date.now());
});
});
describe('runNow', () => {
test('resolves when the task run succeeds', () => {
const events$ = new Subject<TaskLifecycleEvent>();
describe('runSoon', () => {
test('resolves when the task update succeeds', async () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
mockTaskStore.get.mockResolvedValueOnce(mockTask({ id, status: TaskStatus.Idle }));
mockTaskStore.update.mockResolvedValueOnce(mockTask({ id }));
const result = taskScheduling.runNow(id);
const result = await taskScheduling.runSoon(id);
const task = mockTask({ id });
events$.next(
asTaskRunEvent(
expect(mockTaskStore.update).toHaveBeenCalledWith(
mockTask({
id,
asOk({ task, result: TaskRunResult.Success, persistence: TaskPersistence.Recurring })
)
status: TaskStatus.Idle,
runAt: expect.any(Date),
scheduledAt: expect.any(Date),
})
);
return expect(result).resolves.toEqual({ id });
expect(mockTaskStore.get).toHaveBeenCalledWith(id);
expect(result).toEqual({ id });
});
test('rejects when the task run fails', () => {
const events$ = new Subject<TaskLifecycleEvent>();
test('runs failed tasks too', async () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
mockTaskStore.get.mockResolvedValueOnce(mockTask({ id, status: TaskStatus.Failed }));
mockTaskStore.update.mockResolvedValueOnce(mockTask({ id }));
const result = taskScheduling.runNow(id);
const task = mockTask({ id });
events$.next(asTaskClaimEvent(id, asOk(task)));
events$.next(asTaskMarkRunningEvent(id, asOk(task)));
events$.next(
asTaskRunEvent(
const result = await taskScheduling.runSoon(id);
expect(mockTaskStore.update).toHaveBeenCalledWith(
mockTask({
id,
asErr({
task,
error: new Error('some thing gone wrong'),
result: TaskRunResult.Failed,
persistence: TaskPersistence.Recurring,
})
)
status: TaskStatus.Idle,
runAt: expect.any(Date),
scheduledAt: expect.any(Date),
})
);
expect(mockTaskStore.get).toHaveBeenCalledWith(id);
expect(result).toEqual({ id });
});
return expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
test('rejects when the task update fails', async () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
mockTaskStore.get.mockResolvedValueOnce(mockTask({ id, status: TaskStatus.Idle }));
mockTaskStore.update.mockRejectedValueOnce(500);
const result = taskScheduling.runSoon(id);
await expect(result).rejects.toEqual(500);
expect(taskSchedulingOpts.logger.error).toHaveBeenCalledWith(
'Failed to update the task (01ddff11-e88a-4d13-bc4e-256164e755e2) for runSoon'
);
});
test('rejects when the task mark as running fails', () => {
const events$ = new Subject<TaskLifecycleEvent>();
test('ignores 409 conflict errors', async () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
mockTaskStore.get.mockResolvedValueOnce(mockTask({ id, status: TaskStatus.Idle }));
mockTaskStore.update.mockRejectedValueOnce({ statusCode: 409 });
const result = taskScheduling.runNow(id);
const task = mockTask({ id });
events$.next(asTaskClaimEvent(id, asOk(task)));
events$.next(asTaskMarkRunningEvent(id, asErr(new Error('some thing gone wrong'))));
return expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
const result = await taskScheduling.runSoon(id);
expect(result).toEqual({ id });
expect(taskSchedulingOpts.logger.debug).toHaveBeenCalledWith(
'Failed to update the task (01ddff11-e88a-4d13-bc4e-256164e755e2) for runSoon due to conflict (409)'
);
});
test('when a task claim fails we ensure the task exists', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
test('rejects when the task is being claimed', async () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
mockTaskStore.getLifecycle.mockResolvedValue(TaskLifecycleResult.NotFound);
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
const result = taskScheduling.runNow(id);
events$.next(
asTaskClaimEvent(
id,
asErr({ task: none, errorType: TaskClaimErrorType.CLAIMED_BY_ID_NOT_RETURNED })
)
);
mockTaskStore.get.mockResolvedValueOnce(mockTask({ id, status: TaskStatus.Claiming }));
mockTaskStore.update.mockRejectedValueOnce(409);
const result = taskScheduling.runSoon(id);
await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it does not exist`)
);
expect(mockTaskStore.getLifecycle).toHaveBeenCalledWith(id);
});
test('when a task claim due to insufficient capacity we return an explciit message', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
mockTaskStore.getLifecycle.mockResolvedValue(TaskLifecycleResult.NotFound);
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
const result = taskScheduling.runNow(id);
const task = mockTask({ id, taskType: 'foo' });
events$.next(
asTaskClaimEvent(
id,
asErr({ task: some(task), errorType: TaskClaimErrorType.CLAIMED_BY_ID_OUT_OF_CAPACITY })
Error(
'Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" as it is currently running'
)
);
});
test('rejects when the task is already running', async () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
mockTaskStore.get.mockResolvedValueOnce(mockTask({ id, status: TaskStatus.Running }));
mockTaskStore.update.mockRejectedValueOnce(409);
const result = taskScheduling.runSoon(id);
await expect(result).rejects.toEqual(
new Error(
`Failed to run task "${id}" as we would exceed the max concurrency of "${task.taskType}" which is 2. Rescheduled the task to ensure it is picked up as soon as possible.`
Error(
'Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" as it is currently running'
)
);
});
test('when a task claim fails we ensure the task isnt already claimed', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
test('rejects when the task status is Unrecognized', async () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
mockTaskStore.getLifecycle.mockResolvedValue(TaskStatus.Claiming);
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
const result = taskScheduling.runNow(id);
events$.next(
asTaskClaimEvent(
id,
asErr({ task: none, errorType: TaskClaimErrorType.CLAIMED_BY_ID_NOT_RETURNED })
)
);
mockTaskStore.get.mockResolvedValueOnce(mockTask({ id, status: TaskStatus.Unrecognized }));
mockTaskStore.update.mockRejectedValueOnce(409);
const result = taskScheduling.runSoon(id);
await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it is currently running`)
Error('Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" with status unrecognized')
);
expect(mockTaskStore.getLifecycle).toHaveBeenCalledWith(id);
});
test('when a task claim fails we ensure the task isnt already running', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
test('rejects when the task does not exist', async () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
mockTaskStore.getLifecycle.mockResolvedValue(TaskStatus.Running);
mockTaskStore.get.mockRejectedValueOnce(404);
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
const result = taskScheduling.runNow(id);
events$.next(
asTaskClaimEvent(
id,
asErr({ task: none, errorType: TaskClaimErrorType.CLAIMED_BY_ID_NOT_RETURNED })
)
);
await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it is currently running`)
);
expect(mockTaskStore.getLifecycle).toHaveBeenCalledWith(id);
});
test('rejects when the task run fails due to capacity', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
mockTaskStore.getLifecycle.mockResolvedValue(TaskStatus.Idle);
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
const result = taskScheduling.runNow(id);
events$.next(asTaskRunRequestEvent(id, asErr(new Error('failed to buffer request'))));
await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}": Task Manager is at capacity, please try again later`)
);
expect(mockTaskStore.getLifecycle).not.toHaveBeenCalled();
});
test('when a task claim fails we return the underlying error if the task is idle', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
mockTaskStore.getLifecycle.mockResolvedValue(TaskStatus.Idle);
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
const result = taskScheduling.runNow(id);
events$.next(
asTaskClaimEvent(
id,
asErr({ task: none, errorType: TaskClaimErrorType.CLAIMED_BY_ID_NOT_RETURNED })
)
);
await expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "idle")]`
);
expect(mockTaskStore.getLifecycle).toHaveBeenCalledWith(id);
});
test('when a task claim fails we return the underlying error if the task is failed', async () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
mockTaskStore.getLifecycle.mockResolvedValue(TaskStatus.Failed);
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
const result = taskScheduling.runNow(id);
events$.next(
asTaskClaimEvent(
id,
asErr({ task: none, errorType: TaskClaimErrorType.CLAIMED_BY_ID_NOT_RETURNED })
)
);
await expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "failed")]`
);
expect(mockTaskStore.getLifecycle).toHaveBeenCalledWith(id);
});
test('ignores task run success of other tasks', () => {
const events$ = new Subject<TaskLifecycleEvent>();
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
const differentTask = '4bebf429-181b-4518-bb7d-b4246d8a35f0';
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
taskPollingLifecycle: taskPollingLifecycleMock.create({ events$ }),
});
const result = taskScheduling.runNow(id);
const task = mockTask({ id });
const otherTask = { id: differentTask } as ConcreteTaskInstance;
events$.next(asTaskClaimEvent(id, asOk(task)));
events$.next(asTaskClaimEvent(differentTask, asOk(otherTask)));
events$.next(
asTaskRunEvent(
differentTask,
asOk({
task: otherTask,
result: TaskRunResult.Success,
persistence: TaskPersistence.Recurring,
})
)
);
events$.next(
asTaskRunEvent(
id,
asErr({
task,
error: new Error('some thing gone wrong'),
result: TaskRunResult.Failed,
persistence: TaskPersistence.Recurring,
})
)
);
return expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
);
const result = taskScheduling.runSoon(id);
await expect(result).rejects.toEqual(404);
});
});
describe('ephemeralRunNow', () => {
test('runs a task ephemerally', async () => {
const ephemeralEvents$ = new Subject<TaskLifecycleEvent>();
const ephemeralTask = mockTask({

View file

@ -8,37 +8,37 @@
import { filter, take } from 'rxjs/operators';
import pMap from 'p-map';
import { pipe } from 'fp-ts/lib/pipeable';
import { Option, map as mapOptional, getOrElse, isSome } from 'fp-ts/lib/Option';
import { getOrElse, isSome, map as mapOptional, Option } from 'fp-ts/lib/Option';
import uuid from 'uuid';
import { pick, chunk } from 'lodash';
import { chunk, pick } from 'lodash';
import { merge, Subject } from 'rxjs';
import agent from 'elastic-apm-node';
import { Logger } from '@kbn/core/server';
import { mustBeAllOf } from './queries/query_clauses';
import { asOk, either, map, mapErr, promiseResult, isErr } from './lib/result_type';
import { asOk, either, isErr, map, mapErr, promiseResult } from './lib/result_type';
import {
isTaskRunEvent,
isTaskClaimEvent,
isTaskRunRequestEvent,
RanTask,
ErroredTask,
OkResultOf,
ErrResultOf,
ClaimTaskErr,
ErroredTask,
ErrResultOf,
isTaskClaimEvent,
isTaskRunEvent,
isTaskRunRequestEvent,
OkResultOf,
RanTask,
TaskClaimErrorType,
} from './task_events';
import { Middleware } from './lib/middleware';
import { parseIntervalAsMillisecond } from './lib/intervals';
import {
ConcreteTaskInstance,
TaskInstanceWithId,
EphemeralTask,
IntervalSchedule,
TaskInstanceWithDeprecatedFields,
TaskInstanceWithId,
TaskLifecycle,
TaskLifecycleResult,
TaskStatus,
EphemeralTask,
IntervalSchedule,
} from './task';
import { TaskStore } from './task_store';
import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields';
@ -73,6 +73,10 @@ export interface BulkUpdateSchedulesResult {
*/
errors: Array<{ task: ConcreteTaskInstance; error: Error }>;
}
export interface RunSoonResult {
id: ConcreteTaskInstance['id'];
}
export interface RunNowResult {
id: ConcreteTaskInstance['id'];
state?: ConcreteTaskInstance['state'];
@ -201,23 +205,28 @@ export class TaskScheduling {
* Run task.
*
* @param taskId - The task being scheduled.
* @returns {Promise<ConcreteTaskInstance>}
* @returns {Promise<RunSoonResult>}
*/
public async runNow(taskId: string): Promise<RunNowResult> {
return new Promise(async (resolve, reject) => {
try {
this.awaitTaskRunResult(taskId) // don't expose state on runNow
.then(({ id }) =>
resolve({
id,
})
)
.catch(reject);
this.taskPollingLifecycle.attemptToRun(taskId);
} catch (error) {
reject(error);
public async runSoon(taskId: string): Promise<RunSoonResult> {
const task = await this.getNonRunningTask(taskId);
try {
await this.store.update({
...task,
status: TaskStatus.Idle,
scheduledAt: new Date(),
runAt: new Date(),
});
} catch (e) {
if (e.statusCode === 409) {
this.logger.debug(
`Failed to update the task (${taskId}) for runSoon due to conflict (409)`
);
} else {
this.logger.error(`Failed to update the task (${taskId}) for runSoon`);
throw e;
}
});
}
return { id: task.id };
}
/**
@ -395,6 +404,20 @@ export class TaskScheduling {
)
);
}
private async getNonRunningTask(taskId: string) {
const task = await this.store.get(taskId);
switch (task.status) {
case TaskStatus.Claiming:
case TaskStatus.Running:
throw Error(`Failed to run task "${taskId}" as it is currently running`);
case TaskStatus.Unrecognized:
throw Error(`Failed to run task "${taskId}" with status ${task.status}`);
case TaskStatus.Failed:
default:
return task;
}
}
}
const cancellablePromise = () => {

View file

@ -333,7 +333,7 @@ export function defineRoutes(
router.post(
{
path: `/api/alerting_actions_telemetry/run_now`,
path: `/api/alerting_actions_telemetry/run_soon`,
validate: {
body: schema.object({
taskId: schema.string({
@ -359,7 +359,7 @@ export function defineRoutes(
const { taskId } = req.body;
try {
const taskManager = await taskManagerStart;
return res.ok({ body: await taskManager.runNow(taskId) });
return res.ok({ body: await taskManager.runSoon(taskId) });
} catch (err) {
return res.ok({ body: { id: taskId, error: `${err}` } });
}

View file

@ -159,22 +159,23 @@ export default function createActionsTelemetryTests({ getService }: FtrProviderC
// request telemetry task to run
await supertest
.post('/api/alerting_actions_telemetry/run_now')
.post('/api/alerting_actions_telemetry/run_soon')
.set('kbn-xsrf', 'xxx')
.send({ taskId: 'Actions-actions_telemetry' })
.expect(200);
// get telemetry task doc
const telemetryTask = await es.get<TaskManagerDoc>({
id: `task:Actions-actions_telemetry`,
index: '.kibana_task_manager',
let telemetry: any;
await retry.try(async () => {
const telemetryTask = await es.get<TaskManagerDoc>({
id: `task:Actions-actions_telemetry`,
index: '.kibana_task_manager',
});
expect(telemetryTask!._source!.task?.status).to.be('idle');
const taskState = telemetryTask!._source!.task?.state;
expect(taskState).not.to.be(undefined);
telemetry = JSON.parse(taskState!);
expect(telemetry.count_total).to.equal(19);
});
const taskState = telemetryTask?._source?.task?.state;
expect(taskState).not.to.be(undefined);
const telemetry = JSON.parse(taskState!);
// total number of connectors
expect(telemetry.count_total).to.equal(19);
// total number of active connectors (used by a rule)
expect(telemetry.count_active_total).to.equal(7);

View file

@ -6,12 +6,13 @@
*/
import expect from '@kbn/expect';
import { TaskRunning, TaskRunningStage } from '@kbn/task-manager-plugin/server/task_running';
import { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server';
import { Spaces, Superuser } from '../../../scenarios';
import {
getUrlPrefix,
getEventLog,
getTestRuleData,
TaskManagerDoc,
ESTestIndexTool,
} from '../../../../common/lib';
import { FtrProviderContext } from '../../../../common/ftr_provider_context';
@ -216,22 +217,32 @@ export default function createAlertingTelemetryTests({ getService }: FtrProvider
// request telemetry task to run
await supertest
.post('/api/alerting_actions_telemetry/run_now')
.post('/api/alerting_actions_telemetry/run_soon')
.set('kbn-xsrf', 'xxx')
.send({ taskId: 'Alerting-alerting_telemetry' })
.expect(200);
// get telemetry task doc
const telemetryTask = await es.get<TaskManagerDoc>({
id: `task:Alerting-alerting_telemetry`,
index: '.kibana_task_manager',
});
const taskState = telemetryTask?._source?.task?.state;
expect(taskState).not.to.be(undefined);
const telemetry = JSON.parse(taskState!);
let telemetry: any;
// total number of rules
expect(telemetry.count_total).to.equal(21);
await retry.try(async () => {
const resp = await es.search<TaskRunning<TaskRunningStage.RAN, ConcreteTaskInstance>>({
index: '.kibana_task_manager',
body: {
query: {
term: {
_id: `task:Alerting-alerting_telemetry`,
},
},
},
});
const task = resp.hits.hits[0]?._source?.task;
expect(task?.status).to.be('idle');
const taskState = task?.state;
expect(taskState).not.to.be(undefined);
telemetry = JSON.parse(String(taskState!));
// total number of rules
expect(telemetry.count_total).to.equal(21);
});
// total number of enabled rules
expect(telemetry.count_active_total).to.equal(18);

View file

@ -85,7 +85,7 @@ export function initRoutes(
router.post(
{
path: `/api/sample_tasks/run_now`,
path: `/api/sample_tasks/run_soon`,
validate: {
body: schema.object({
task: schema.object({
@ -104,7 +104,7 @@ export function initRoutes(
} = req.body;
try {
const taskManager = await taskManagerStart;
return res.ok({ body: await taskManager.runNow(id) });
return res.ok({ body: await taskManager.runSoon(id) });
} catch (err) {
return res.ok({ body: { id, error: `${err}` } });
}

View file

@ -6,11 +6,11 @@
*/
import moment from 'moment';
import { random, times } from 'lodash';
import { random } from 'lodash';
import expect from '@kbn/expect';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import TaskManagerMapping from '@kbn/task-manager-plugin/server/saved_objects/mappings.json';
import { DEFAULT_MAX_WORKERS, DEFAULT_POLL_INTERVAL } from '@kbn/task-manager-plugin/server/config';
import { DEFAULT_POLL_INTERVAL } from '@kbn/task-manager-plugin/server/config';
import { ConcreteTaskInstance, BulkUpdateSchedulesResult } from '@kbn/task-manager-plugin/server';
import { FtrProviderContext } from '../../ftr_provider_context';
@ -169,9 +169,9 @@ export default function ({ getService }: FtrProviderContext) {
});
}
function runTaskNow(task: { id: string }) {
function runTaskSoon(task: { id: string }) {
return supertest
.post('/api/sample_tasks/run_now')
.post('/api/sample_tasks/run_soon')
.set('kbn-xsrf', 'xxx')
.send({ task })
.expect(200)
@ -434,11 +434,11 @@ export default function ({ getService }: FtrProviderContext) {
});
const now = Date.now();
const runNowResult = await runTaskNow({
const runSoonResult = await runTaskSoon({
id: originalTask.id,
});
expect(runNowResult).to.eql({ id: originalTask.id });
expect(runSoonResult).to.eql({ id: originalTask.id });
await retry.try(async () => {
expect(
@ -456,81 +456,6 @@ export default function ({ getService }: FtrProviderContext) {
});
});
it('should prioritize tasks which are called using runNow', async () => {
const originalTask = await scheduleTask({
taskType: 'sampleTask',
schedule: { interval: `30m` },
params: {},
});
await retry.try(async () => {
const docs = await historyDocs(originalTask.id);
expect(docs.length).to.eql(1);
const task = await currentTask<{ count: number }>(originalTask.id);
expect(task.state.count).to.eql(1);
// ensure this task shouldnt run for another half hour
expectReschedule(Date.parse(originalTask.runAt), task, 30 * 60000);
});
const taskToBeReleased = await scheduleTask({
taskType: 'sampleTask',
params: { waitForEvent: 'releaseSingleTask' },
});
await retry.try(async () => {
// wait for taskToBeReleased to stall
expect((await historyDocs(taskToBeReleased.id)).length).to.eql(1);
});
// schedule multiple tasks that should force
// Task Manager to use up its worker capacity
// causing tasks to pile up
await Promise.all(
times(DEFAULT_MAX_WORKERS + random(1, DEFAULT_MAX_WORKERS), () =>
scheduleTask({
taskType: 'sampleTask',
params: {
waitForEvent: 'releaseTheOthers',
},
})
)
);
// we need to ensure that TM has a chance to fill its queue with the stalling tasks
await delay(DEFAULT_POLL_INTERVAL);
// call runNow for our task
const runNowResult = runTaskNow({
id: originalTask.id,
});
// we need to ensure that TM has a chance to push the runNow task into the queue
// before we release the stalled task, so lets give it a chance
await delay(DEFAULT_POLL_INTERVAL);
// and release only one slot in our worker queue
await releaseTasksWaitingForEventToComplete('releaseSingleTask');
expect(await runNowResult).to.eql({ id: originalTask.id });
await retry.try(async () => {
const task = await currentTask<{ count: number }>(originalTask.id);
expect(task.state.count).to.eql(2);
});
// drain tasks, othrwise they'll keep Task Manager stalled
await retry.try(async () => {
await releaseTasksWaitingForEventToComplete('releaseTheOthers');
const tasks = (
await currentTasks<{}, { originalParams: { waitForEvent: string } }>()
).docs.filter((task) => task.params.originalParams.waitForEvent === 'releaseTheOthers');
expect(tasks.length).to.eql(0);
});
});
it('should only run as many instances of a task as its maxConcurrency will allow', async () => {
// should run as there's only one and maxConcurrency on this TaskType is 1
const firstWithSingleConcurrency = await scheduleTask({
@ -614,112 +539,6 @@ export default function ({ getService }: FtrProviderContext) {
await releaseTasksWaitingForEventToComplete('releaseSecondWaveOfTasks');
});
it('should return a task run error result when RunNow is called at a time that would cause the task to exceed its maxConcurrency', async () => {
// should run as there's only one and maxConcurrency on this TaskType is 1
const firstWithSingleConcurrency = await scheduleTask({
taskType: 'sampleTaskWithSingleConcurrency',
// include a schedule so that the task isn't deleted after completion
schedule: { interval: `30m` },
params: {
waitForEvent: 'releaseRunningTaskWithSingleConcurrencyFirst',
},
});
// should not run as the first is running
const secondWithSingleConcurrency = await scheduleTask({
taskType: 'sampleTaskWithSingleConcurrency',
params: {
waitForEvent: 'releaseRunningTaskWithSingleConcurrencySecond',
},
});
// run the first tasks once just so that we can be sure it runs in response to our
// runNow callm, rather than the initial execution
await retry.try(async () => {
expect((await historyDocs(firstWithSingleConcurrency.id)).length).to.eql(1);
});
await releaseTasksWaitingForEventToComplete('releaseRunningTaskWithSingleConcurrencyFirst');
// wait for second task to stall
await retry.try(async () => {
expect((await historyDocs(secondWithSingleConcurrency.id)).length).to.eql(1);
});
// run the first task again using runNow - should fail due to concurrency concerns
const failedRunNowResult = await runTaskNow({
id: firstWithSingleConcurrency.id,
});
expect(failedRunNowResult).to.eql({
id: firstWithSingleConcurrency.id,
error: `Error: Failed to run task "${firstWithSingleConcurrency.id}" as we would exceed the max concurrency of "Sample Task With Single Concurrency" which is 1. Rescheduled the task to ensure it is picked up as soon as possible.`,
});
// release the second task
await releaseTasksWaitingForEventToComplete('releaseRunningTaskWithSingleConcurrencySecond');
});
it('should return a task run error result when running a task now fails', async () => {
const originalTask = await scheduleTask({
taskType: 'sampleTask',
schedule: { interval: `30m` },
params: { failWith: 'this task was meant to fail!', failOn: 3 },
});
await retry.try(async () => {
const docs = await historyDocs();
expect(docs.filter((taskDoc) => taskDoc._source.taskId === originalTask.id).length).to.eql(
1
);
const task = await currentTask<{ count: number }>(originalTask.id);
expect(task.state.count).to.eql(1);
expect(task.status).to.eql('idle');
// ensure this task shouldnt run for another half hour
expectReschedule(Date.parse(originalTask.runAt), task, 30 * 60000);
});
await ensureTasksIndexRefreshed();
// second run should still be successful
const successfulRunNowResult = await runTaskNow({
id: originalTask.id,
});
expect(successfulRunNowResult).to.eql({ id: originalTask.id });
await retry.try(async () => {
const task = await currentTask<{ count: number }>(originalTask.id);
expect(task.state.count).to.eql(2);
expect(task.status).to.eql('idle');
});
await ensureTasksIndexRefreshed();
// flaky: runTaskNow() sometimes fails with the following error, so retrying
// error: Failed to run task "<id>" as it is currently running
await retry.try(async () => {
const failedRunNowResult = await runTaskNow({
id: originalTask.id,
});
expect(failedRunNowResult).to.eql({
id: originalTask.id,
error: `Error: Failed to run task \"${originalTask.id}\": Error: this task was meant to fail!`,
});
});
await retry.try(async () => {
expect(
(await historyDocs()).filter((taskDoc) => taskDoc._source.taskId === originalTask.id)
.length
).to.eql(2);
const task = await currentTask(originalTask.id);
expect(task.attempts).to.eql(1);
});
});
it('should increment attempts when task fails on markAsRunning', async () => {
const originalTask = await scheduleTask({
taskType: 'sampleTask',
@ -736,12 +555,12 @@ export default function ({ getService }: FtrProviderContext) {
});
it('should return a task run error result when trying to run a non-existent task', async () => {
// runNow should fail
const failedRunNowResult = await runTaskNow({
// runSoon should fail
const failedRunSoonResult = await runTaskSoon({
id: 'i-dont-exist',
});
expect(failedRunNowResult).to.eql({
error: `Error: Failed to run task "i-dont-exist" as it does not exist`,
expect(failedRunSoonResult).to.eql({
error: `Error: Saved object [task/i-dont-exist] not found`,
id: 'i-dont-exist',
});
});
@ -755,9 +574,9 @@ export default function ({ getService }: FtrProviderContext) {
},
});
// tell the task to wait for the 'runNowHasBeenAttempted' event
// tell the task to wait for the 'runSoonHasBeenAttempted' event
await provideParamsToTasksWaitingForParams(longRunningTask.id, {
waitForEvent: 'runNowHasBeenAttempted',
waitForEvent: 'runSoonHasBeenAttempted',
});
await retry.try(async () => {
@ -772,18 +591,18 @@ export default function ({ getService }: FtrProviderContext) {
await ensureTasksIndexRefreshed();
// first runNow should fail
const failedRunNowResult = await runTaskNow({
// first runSoon should fail
const failedRunSoonResult = await runTaskSoon({
id: longRunningTask.id,
});
expect(failedRunNowResult).to.eql({
expect(failedRunSoonResult).to.eql({
error: `Error: Failed to run task "${longRunningTask.id}" as it is currently running`,
id: longRunningTask.id,
});
// finish first run by emitting 'runNowHasBeenAttempted' event
await releaseTasksWaitingForEventToComplete('runNowHasBeenAttempted');
// finish first run by emitting 'runSoonHasBeenAttempted' event
await releaseTasksWaitingForEventToComplete('runSoonHasBeenAttempted');
await retry.try(async () => {
const tasks = (await currentTasks<{ count: number }>()).docs;
expect(getTaskById(tasks, longRunningTask.id).state.count).to.eql(1);
@ -794,44 +613,14 @@ export default function ({ getService }: FtrProviderContext) {
await ensureTasksIndexRefreshed();
// second runNow should be successful
const successfulRunNowResult = runTaskNow({
// second runSoon should be successful
const successfulRunSoonResult = runTaskSoon({
id: longRunningTask.id,
});
await provideParamsToTasksWaitingForParams(longRunningTask.id);
expect(await successfulRunNowResult).to.eql({ id: longRunningTask.id });
});
it('should allow a failed task to be rerun using runNow', async () => {
const taskThatFailsBeforeRunNow = await scheduleTask({
taskType: 'singleAttemptSampleTask',
params: {
waitForParams: true,
},
});
// tell the task to fail on its next run
await provideParamsToTasksWaitingForParams(taskThatFailsBeforeRunNow.id, {
failWith: 'error on first run',
});
// wait for task to fail
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(getTaskById(tasks, taskThatFailsBeforeRunNow.id).status).to.eql('failed');
});
// runNow should be successfully run the failing task
const runNowResultWithExpectedFailure = runTaskNow({
id: taskThatFailsBeforeRunNow.id,
});
// release the task without failing this time
await provideParamsToTasksWaitingForParams(taskThatFailsBeforeRunNow.id);
expect(await runNowResultWithExpectedFailure).to.eql({ id: taskThatFailsBeforeRunNow.id });
expect(await successfulRunSoonResult).to.eql({ id: longRunningTask.id });
});
function expectReschedule(
@ -973,7 +762,7 @@ export default function ({ getService }: FtrProviderContext) {
params: {},
});
runTaskNow({ id: longRunningTask.id });
runTaskSoon({ id: longRunningTask.id });
let scheduledRunAt: string;
// ensure task is running and store scheduled runAt
@ -1004,6 +793,39 @@ export default function ({ getService }: FtrProviderContext) {
expect(task.runAt).to.eql(scheduledRunAt);
});
});
it('should allow a failed task to be rerun using runSoon', async () => {
const taskThatFailsBeforeRunNow = await scheduleTask({
taskType: 'singleAttemptSampleTask',
params: {
waitForParams: true,
},
});
// tell the task to fail on its next run
await provideParamsToTasksWaitingForParams(taskThatFailsBeforeRunNow.id, {
failWith: 'error on first run',
});
// wait for task to fail
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(getTaskById(tasks, taskThatFailsBeforeRunNow.id).status).to.eql('failed');
});
// run the task again
await runTaskSoon({
id: taskThatFailsBeforeRunNow.id,
});
// runTaskSoon should successfully update the runAt property of the task
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(
Date.parse(getTaskById(tasks, taskThatFailsBeforeRunNow.id).runAt)
).to.be.greaterThan(Date.parse(taskThatFailsBeforeRunNow.runAt));
});
});
// TODO: Add this back in with https://github.com/elastic/kibana/issues/106139
// it('should return the resulting task state when asked to run an ephemeral task now', async () => {
// const ephemeralTask = await runEphemeralTaskNow({