[Response Ops] Remove ephemeral tasks from task manager plugin (#201313)

## Summary

Resolves: https://github.com/elastic/kibana/issues/151463

Removes all reference to ephemeral tasks from the task manager plugin.
As well as unit and E2E tests while maintaining backwards compatibility
for `xpack.task_manager.ephemeral_tasks` flag to no-op if set. This PR
has some dependencies from the PR to remove ephemeral task support from
the alerting and actions plugin
(https://github.com/elastic/kibana/pull/197421). So it should be merged
after the other PR.

Deprecates the following configuration settings:

- xpack.task_manager.ephemeral_tasks.enabled
- xpack.task_manager.ephemeral_tasks.request_capacity

The user doesn't have to change anything on their end if they don't wish
to. This deprecation is made so if the above settings are defined,
kibana will simply do nothing.

### Checklist
- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
This commit is contained in:
Jiawei Wu 2024-12-13 14:31:31 -08:00 committed by GitHub
parent 07a69023d3
commit 5a9129e22d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
51 changed files with 47 additions and 3123 deletions

View file

@ -33,16 +33,6 @@ This flag will enable automatic warn and error logging if task manager self dete
`xpack.task_manager.monitored_stats_health_verbose_log.warn_delayed_task_start_in_seconds`::
The amount of seconds we allow a task to delay before printing a warning server log. Defaults to 60.
`xpack.task_manager.ephemeral_tasks.enabled`::
deprecated:[8.8.0]
Enables a technical preview feature that executes a limited (and configurable) number of actions in the same task as the alert which triggered them.
These action tasks will reduce the latency of the time it takes an action to run after it's triggered, but are not persisted as SavedObjects.
These non-persisted action tasks have a risk that they won't be run at all if the Kibana instance running them exits unexpectedly. Defaults to false.
`xpack.task_manager.ephemeral_tasks.request_capacity`::
deprecated:[8.8.0]
Sets the size of the ephemeral queue defined above. Defaults to 10.
`xpack.task_manager.event_loop_delay.monitor`::
Enables event loop delay monitoring, which will log a warning when a task causes an event loop delay which exceeds the `warn_threshold` setting. Defaults to true.

View file

@ -181,7 +181,6 @@ The API returns the following:
"persistence": {
"recurring": 88,
"non_recurring": 4,
"ephemeral": 8
},
"result_frequency_percent_as_number": {
"alerting:.index-threshold": {
@ -608,11 +607,10 @@ Resolving that would require deeper investigation into the {kib} Server Log, whe
[[task-manager-theory-spikes-in-non-recurring-tasks]]
*Theory*:
Spikes in non-recurring and ephemeral tasks are consuming a high percentage of the available capacity
Spikes in non-recurring tasks are consuming a high percentage of the available capacity
*Diagnosis*:
Task Manager uses ad-hoc non-recurring tasks to load balance operations across multiple {kib} instances.
Additionally, {kib} can use Task Manager to allocate resources for expensive operations by executing an ephemeral task. Ephemeral tasks are identical in operation to non-recurring tasks, but are not persisted and cannot be load balanced across {kib} instances.
Evaluating the preceding health stats, you see the following output under `stats.runtime.value.execution.persistence`:
@ -620,13 +618,11 @@ Evaluating the preceding health stats, you see the following output under `stats
--------------------------------------------------
{
"recurring": 88, # <1>
"non_recurring": 4, # <2>
"ephemeral": 8 # <3>
"non_recurring": 12, # <2>
},
--------------------------------------------------
<1> 88% of executed tasks are recurring tasks
<2> 4% of executed tasks are non-recurring tasks
<3> 8% of executed tasks are ephemeral tasks
<2> 12% of executed tasks are non-recurring tasks
You can infer from these stats that the majority of executions consist of recurring tasks at 88%.
You can use the `execution.persistence` stats to evaluate the ratio of consumed capacity, but on their own, you should not make assumptions about the sufficiency of the available capacity.
@ -645,7 +641,7 @@ To assess the capacity, you should evaluate these stats against the `load` under
}
--------------------------------------------------
You can infer from these stats that it is very unusual for Task Manager to run out of capacity, so the capacity is likely sufficient to handle the amount of non-recurring and ephemeral tasks.
You can infer from these stats that it is very unusual for Task Manager to run out of capacity, so the capacity is likely sufficient to handle the amount of non-recurring tasks.
Suppose you have an alternate scenario, where you see the following output under `stats.runtime.value.execution.persistence`:
@ -653,15 +649,13 @@ Suppose you have an alternate scenario, where you see the following output under
--------------------------------------------------
{
"recurring": 60, # <1>
"non_recurring": 30, # <2>
"ephemeral": 10 # <3>
"non_recurring": 40, # <2>
},
--------------------------------------------------
<1> 60% of executed tasks are recurring tasks
<2> 30% of executed tasks are non-recurring tasks
<3> 10% of executed tasks are ephemeral tasks
<2> 40% of executed tasks are non-recurring tasks
You can infer from these stats that even though most executions are recurring tasks, a substantial percentage of executions are non-recurring and ephemeral tasks at 40%.
You can infer from these stats that even though most executions are recurring tasks, a substantial percentage of executions are non-recurring tasks at 40%.
Evaluating the `load` under `stats.runtime.value`, you see the following:
@ -678,9 +672,9 @@ Evaluating the `load` under `stats.runtime.value`, you see the following:
--------------------------------------------------
You can infer from these stats that it is quite common for this {kib} instance to run out of capacity.
Given the high rate of non-recurring and ephemeral tasks, it would be reasonable to assess that there is insufficient capacity in the {kib} cluster to handle the amount of tasks.
Given the high rate of non-recurring tasks, it would be reasonable to assess that there is insufficient capacity in the {kib} cluster to handle the amount of tasks.
Keep in mind that these stats give you a glimpse at a moment in time, and even though there has been insufficient capacity in recent minutes, this might not be true in other times where fewer non-recurring or ephemeral tasks are used. We recommend tracking these stats over time and identifying the source of these tasks before making sweeping changes to your infrastructure.
Keep in mind that these stats give you a glimpse at a moment in time, and even though there has been insufficient capacity in recent minutes, this might not be true in other times where fewer non-recurring tasks are used. We recommend tracking these stats over time and identifying the source of these tasks before making sweeping changes to your infrastructure.
[[task-manager-health-evaluate-the-workload]]
===== Evaluate the Workload

View file

@ -2492,85 +2492,6 @@ describe('Task Runner', () => {
expect(mockUsageCounter.incrementCounter).not.toHaveBeenCalled();
});
test('successfully executes the task with ephemeral tasks enabled', async () => {
const taskRunner = new TaskRunner({
ruleType,
internalSavedObjectsRepository,
taskInstance: {
...mockedTaskInstance,
state: {
...mockedTaskInstance.state,
previousStartedAt: new Date(Date.now() - 5 * 60 * 1000).toISOString(),
},
},
context: {
...taskRunnerFactoryInitializerParams,
},
inMemoryMetrics,
});
expect(AlertingEventLogger).toHaveBeenCalled();
mockGetAlertFromRaw.mockReturnValue(mockedRuleTypeSavedObject as Rule);
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue(mockedRawRuleSO);
const runnerResult = await taskRunner.run();
expect(runnerResult).toEqual(generateRunnerResult({ state: true, history: [true] }));
expect(ruleType.executor).toHaveBeenCalledTimes(1);
const call = ruleType.executor.mock.calls[0][0];
expect(call.params).toEqual({ bar: true });
expect(call.startedAt).toEqual(new Date(DATE_1970));
expect(call.previousStartedAt).toEqual(new Date(DATE_1970_5_MIN));
expect(call.state).toEqual({});
expect(call.rule).not.toBe(null);
expect(call.rule.id).toBe('1');
expect(call.rule.name).toBe(RULE_NAME);
expect(call.rule.tags).toEqual(['rule-', '-tags']);
expect(call.rule.consumer).toBe('bar');
expect(call.rule.enabled).toBe(true);
expect(call.rule.schedule).toEqual({ interval: '10s' });
expect(call.rule.createdBy).toBe('rule-creator');
expect(call.rule.updatedBy).toBe('rule-updater');
expect(call.rule.createdAt).toBe(mockDate);
expect(call.rule.updatedAt).toBe(mockDate);
expect(call.rule.notifyWhen).toBe('onActiveAlert');
expect(call.rule.throttle).toBe(null);
expect(call.rule.producer).toBe('alerts');
expect(call.rule.ruleTypeId).toBe('test');
expect(call.rule.ruleTypeName).toBe('My test rule');
expect(call.rule.actions).toEqual(RULE_ACTIONS);
expect(call.services.alertFactory.create).toBeTruthy();
expect(call.services.scopedClusterClient).toBeTruthy();
expect(call.services).toBeTruthy();
expect(logger.debug).toHaveBeenCalledTimes(5);
expect(logger.debug).nthCalledWith(1, 'executing rule test:1 at 1970-01-01T00:00:00.000Z', {
tags: ['1', 'test'],
});
expect(logger.debug).nthCalledWith(
2,
'deprecated ruleRunStatus for test:1: {"lastExecutionDate":"1970-01-01T00:00:00.000Z","status":"ok"}',
{ tags: ['1', 'test'] }
);
expect(logger.debug).nthCalledWith(
3,
'ruleRunStatus for test:1: {"outcome":"succeeded","outcomeOrder":0,"outcomeMsg":null,"warning":null,"alertsCount":{"active":0,"new":0,"recovered":0,"ignored":0}}',
{ tags: ['1', 'test'] }
);
expect(logger.debug).nthCalledWith(
4,
'ruleRunMetrics for test:1: {"numSearches":3,"totalSearchDurationMs":23423,"esSearchDurationMs":33,"numberOfTriggeredActions":0,"numberOfGeneratedActions":0,"numberOfActiveAlerts":0,"numberOfRecoveredAlerts":0,"numberOfNewAlerts":0,"numberOfDelayedAlerts":0,"hasReachedAlertLimit":false,"hasReachedQueuedActionsLimit":false,"triggeredActionsStatus":"complete"}',
{ tags: ['1', 'test'] }
);
testAlertingEventLogCalls({
status: 'ok',
});
expect(elasticsearchService.client.asInternalUser.update).toHaveBeenCalledWith(
...generateRuleUpdateParams({})
);
expect(mockUsageCounter.incrementCounter).not.toHaveBeenCalled();
});
test('successfully stores successful runs', async () => {
const taskRunner = new TaskRunner({
ruleType,

View file

@ -19,10 +19,6 @@ describe('config validation', () => {
"active_nodes_lookback": "30s",
"interval": 10000,
},
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
},
"event_loop_delay": Object {
"monitor": true,
"warn_threshold": 5000,
@ -82,10 +78,6 @@ describe('config validation', () => {
"active_nodes_lookback": "30s",
"interval": 10000,
},
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
},
"event_loop_delay": Object {
"monitor": true,
"warn_threshold": 5000,
@ -143,10 +135,6 @@ describe('config validation', () => {
"active_nodes_lookback": "30s",
"interval": 10000,
},
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
},
"event_loop_delay": Object {
"monitor": true,
"warn_threshold": 5000,
@ -296,4 +284,15 @@ describe('config validation', () => {
`"[discovery.active_nodes_lookback]: active node lookback duration cannot exceed five minutes"`
);
});
test('should not throw if ephemeral_tasks is defined', () => {
const config: Record<string, unknown> = {
ephemeral_tasks: {
enabled: true,
request_capacity: 20,
},
};
expect(() => configSchema.validate(config)).not.toThrow();
});
});

View file

@ -16,7 +16,6 @@ export const DEFAULT_MAX_WORKERS = 10;
export const DEFAULT_POLL_INTERVAL = 3000;
export const MGET_DEFAULT_POLL_INTERVAL = 500;
export const DEFAULT_VERSION_CONFLICT_THRESHOLD = 80;
export const DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY = MAX_WORKERS_LIMIT;
// Monitoring Constants
// ===================
@ -101,16 +100,8 @@ export const configSchema = schema.object(
max: MAX_DISCOVERY_INTERVAL_MS,
}),
}),
ephemeral_tasks: schema.object({
enabled: schema.boolean({ defaultValue: false }),
/* How many requests can Task Manager buffer before it rejects new requests. */
request_capacity: schema.number({
// a nice round contrived number, feel free to change as we learn how it behaves
defaultValue: 10,
min: 1,
max: DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY,
}),
}),
/* Allows for old kibana config to start kibana without crashing since ephemeral tasks are deprecated*/
ephemeral_tasks: schema.maybe(schema.any()),
event_loop_delay: eventLoopDelaySchema,
kibanas_per_partition: schema.number({
defaultValue: DEFAULT_KIBANAS_PER_PARTITION,

View file

@ -1,24 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle';
import { TaskLifecycleEvent } from './polling_lifecycle';
import { of, Observable } from 'rxjs';
export const ephemeralTaskLifecycleMock = {
create(opts: { events$?: Observable<TaskLifecycleEvent>; getQueuedTasks?: () => number }) {
return {
attemptToRun: jest.fn(),
get events() {
return opts.events$ ?? of();
},
get queuedTasks() {
return opts.getQueuedTasks ? opts.getQueuedTasks() : 0;
},
} as unknown as jest.Mocked<EphemeralTaskLifecycle>;
},
};

View file

@ -1,414 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Subject } from 'rxjs';
import { TaskLifecycleEvent } from './polling_lifecycle';
import { createInitialMiddleware } from './lib/middleware';
import { TaskTypeDictionary } from './task_type_dictionary';
import { mockLogger } from './test_utils';
import { asErr, asOk } from './lib/result_type';
import { FillPoolResult } from './lib/fill_pool';
import { EphemeralTaskLifecycle, EphemeralTaskLifecycleOpts } from './ephemeral_task_lifecycle';
import { v4 as uuidv4 } from 'uuid';
import { asTaskPollingCycleEvent, asTaskRunEvent, TaskPersistence } from './task_events';
import { TaskRunResult } from './task_running';
import { TaskPoolRunResult } from './task_pool';
import { TaskPoolMock } from './task_pool/task_pool.mock';
import { executionContextServiceMock } from '@kbn/core/server/mocks';
import { taskManagerMock } from './mocks';
jest.mock('./constants', () => ({
CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: ['report'],
}));
const executionContext = executionContextServiceMock.createSetupContract();
describe('EphemeralTaskLifecycle', () => {
function initTaskLifecycleParams({
config,
...optOverrides
}: {
config?: Partial<EphemeralTaskLifecycleOpts['config']>;
} & Partial<Omit<EphemeralTaskLifecycleOpts, 'config'>> = {}) {
const taskManagerLogger = mockLogger();
const poolCapacity = jest.fn();
const pool = TaskPoolMock.create(poolCapacity);
const lifecycleEvent$ = new Subject<TaskLifecycleEvent>();
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
const opts: EphemeralTaskLifecycleOpts = {
logger: taskManagerLogger,
definitions: new TaskTypeDictionary(taskManagerLogger),
executionContext,
config: {
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
max_attempts: 9,
poll_interval: 6000000,
version_conflict_threshold: 80,
request_capacity: 1000,
allow_reading_invalid_state: false,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_required_freshness: 5000,
monitored_stats_running_average_window: 50,
monitored_stats_health_verbose_log: {
enabled: true,
level: 'debug',
warn_delayed_task_start_in_seconds: 60,
},
monitored_task_execution_thresholds: {
default: {
error_threshold: 90,
warn_threshold: 80,
},
custom: {},
},
ephemeral_tasks: {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
authenticate_background_task_utilization: true,
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
auto_calculate_default_ech_capacity: false,
...config,
},
elasticsearchAndSOAvailability$,
pool,
lifecycleEvent: lifecycleEvent$,
middleware: createInitialMiddleware(),
...optOverrides,
};
opts.definitions.registerTaskDefinitions({
foo: {
title: 'foo',
createTaskRunner: jest.fn(),
},
});
pool.run.mockResolvedValue(Promise.resolve(TaskPoolRunResult.RunningAllClaimedTasks));
return { poolCapacity, lifecycleEvent$, pool, elasticsearchAndSOAvailability$, opts };
}
describe('constructor', () => {
test('avoids unnecesery subscription if ephemeral tasks are disabled', () => {
const { opts } = initTaskLifecycleParams({
config: {
ephemeral_tasks: {
enabled: false,
request_capacity: 10,
},
},
});
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const task = taskManagerMock.createTask();
expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asErr(task));
});
test('queues up tasks when ephemeral tasks are enabled', () => {
const { opts } = initTaskLifecycleParams();
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const task = taskManagerMock.createTask();
expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task));
});
test('rejects tasks when ephemeral tasks are enabled and queue is full', () => {
const { opts } = initTaskLifecycleParams({
config: { ephemeral_tasks: { enabled: true, request_capacity: 2 } },
});
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const task = taskManagerMock.createTask();
expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task));
const task2 = taskManagerMock.createTask();
expect(ephemeralTaskLifecycle.attemptToRun(task2)).toMatchObject(asOk(task2));
const rejectedTask = taskManagerMock.createTask();
expect(ephemeralTaskLifecycle.attemptToRun(rejectedTask)).toMatchObject(asErr(rejectedTask));
});
test('pulls tasks off queue when a polling cycle completes', () => {
const { pool, poolCapacity, opts, lifecycleEvent$ } = initTaskLifecycleParams();
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const task = taskManagerMock.createTask({ id: `my-phemeral-task` });
expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task));
poolCapacity.mockReturnValue({
availableCapacity: 10,
});
lifecycleEvent$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))
);
expect(pool.run).toHaveBeenCalledTimes(1);
const taskRunners = pool.run.mock.calls[0][0];
expect(taskRunners).toHaveLength(1);
expect(`${taskRunners[0]}`).toMatchInlineSnapshot(`"foo \\"my-phemeral-task\\" (Ephemeral)"`);
});
test('pulls tasks off queue when a task run completes', () => {
const { pool, poolCapacity, opts, lifecycleEvent$ } = initTaskLifecycleParams();
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const task = taskManagerMock.createTask({ id: `my-phemeral-task` });
expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task));
poolCapacity.mockReturnValue({
availableCapacity: 10,
});
lifecycleEvent$.next(
asTaskRunEvent(
uuidv4(),
asOk({
task: taskManagerMock.createTask(),
result: TaskRunResult.Success,
persistence: TaskPersistence.Ephemeral,
isExpired: false,
})
)
);
expect(pool.run).toHaveBeenCalledTimes(1);
const taskRunners = pool.run.mock.calls[0][0];
expect(taskRunners).toHaveLength(1);
expect(`${taskRunners[0]}`).toMatchInlineSnapshot(`"foo \\"my-phemeral-task\\" (Ephemeral)"`);
});
test('pulls as many tasks off queue as it has capacity for', () => {
const { pool, poolCapacity, opts, lifecycleEvent$ } = initTaskLifecycleParams();
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const tasks = [
taskManagerMock.createTask(),
taskManagerMock.createTask(),
taskManagerMock.createTask(),
];
expect(ephemeralTaskLifecycle.attemptToRun(tasks[0])).toMatchObject(asOk(tasks[0]));
expect(ephemeralTaskLifecycle.attemptToRun(tasks[1])).toMatchObject(asOk(tasks[1]));
expect(ephemeralTaskLifecycle.attemptToRun(tasks[2])).toMatchObject(asOk(tasks[2]));
poolCapacity.mockReturnValue({
availableCapacity: 2,
});
lifecycleEvent$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))
);
expect(pool.run).toHaveBeenCalledTimes(1);
const taskRunners = pool.run.mock.calls[0][0];
expect(taskRunners).toHaveLength(2);
expect(`${taskRunners[0]}`).toEqual(`foo "${tasks[0].id}" (Ephemeral)`);
expect(`${taskRunners[1]}`).toEqual(`foo "${tasks[1].id}" (Ephemeral)`);
});
test('pulls only as many tasks of the same type as is allowed by maxConcurrency', () => {
const { pool, poolCapacity, opts, lifecycleEvent$ } = initTaskLifecycleParams();
opts.definitions.registerTaskDefinitions({
report: {
title: 'report',
maxConcurrency: 1,
createTaskRunner: jest.fn(),
},
});
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const firstLimitedTask = taskManagerMock.createTask({ taskType: 'report' });
const secondLimitedTask = taskManagerMock.createTask({ taskType: 'report' });
// both are queued
expect(ephemeralTaskLifecycle.attemptToRun(firstLimitedTask)).toMatchObject(
asOk(firstLimitedTask)
);
expect(ephemeralTaskLifecycle.attemptToRun(secondLimitedTask)).toMatchObject(
asOk(secondLimitedTask)
);
// pool has capacity for both
poolCapacity.mockReturnValue({
availableCapacity: 10,
});
pool.getUsedCapacityByType.mockReturnValue(0);
lifecycleEvent$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))
);
expect(pool.run).toHaveBeenCalledTimes(1);
const taskRunners = pool.run.mock.calls[0][0];
expect(taskRunners).toHaveLength(1);
expect(`${taskRunners[0]}`).toEqual(`report "${firstLimitedTask.id}" (Ephemeral)`);
});
test('when pulling tasks from the queue, it takes into account the maxConcurrency of tasks that are already in the pool', () => {
const { pool, poolCapacity, opts, lifecycleEvent$ } = initTaskLifecycleParams();
opts.definitions.registerTaskDefinitions({
report: {
title: 'report',
maxConcurrency: 1,
createTaskRunner: jest.fn(),
},
});
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const firstLimitedTask = taskManagerMock.createTask({ taskType: 'report' });
const secondLimitedTask = taskManagerMock.createTask({ taskType: 'report' });
// both are queued
expect(ephemeralTaskLifecycle.attemptToRun(firstLimitedTask)).toMatchObject(
asOk(firstLimitedTask)
);
expect(ephemeralTaskLifecycle.attemptToRun(secondLimitedTask)).toMatchObject(
asOk(secondLimitedTask)
);
// pool has capacity in general
poolCapacity.mockReturnValue({
availableCapacity: 2,
});
// but when we ask how many it has occupied by type - wee always have one worker already occupied by that type
pool.getUsedCapacityByType.mockReturnValue(1);
lifecycleEvent$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))
);
expect(pool.run).toHaveBeenCalledTimes(0);
// now we release the worker in the pool and cause another cycle in the epheemral queue
pool.getUsedCapacityByType.mockReturnValue(0);
lifecycleEvent$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))
);
expect(pool.run).toHaveBeenCalledTimes(1);
const taskRunners = pool.run.mock.calls[0][0];
expect(taskRunners).toHaveLength(1);
expect(`${taskRunners[0]}`).toEqual(`report "${firstLimitedTask.id}" (Ephemeral)`);
});
});
test('pulls tasks with both maxConcurrency and unlimited concurrency', () => {
const { pool, poolCapacity, opts, lifecycleEvent$ } = initTaskLifecycleParams();
opts.definitions.registerTaskDefinitions({
report: {
title: 'report',
maxConcurrency: 1,
createTaskRunner: jest.fn(),
},
});
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const fooTasks = [
taskManagerMock.createTask(),
taskManagerMock.createTask(),
taskManagerMock.createTask(),
];
expect(ephemeralTaskLifecycle.attemptToRun(fooTasks[0])).toMatchObject(asOk(fooTasks[0]));
const firstLimitedTask = taskManagerMock.createTask({ taskType: 'report' });
expect(ephemeralTaskLifecycle.attemptToRun(firstLimitedTask)).toMatchObject(
asOk(firstLimitedTask)
);
expect(ephemeralTaskLifecycle.attemptToRun(fooTasks[1])).toMatchObject(asOk(fooTasks[1]));
const secondLimitedTask = taskManagerMock.createTask({ taskType: 'report' });
expect(ephemeralTaskLifecycle.attemptToRun(secondLimitedTask)).toMatchObject(
asOk(secondLimitedTask)
);
expect(ephemeralTaskLifecycle.attemptToRun(fooTasks[2])).toMatchObject(asOk(fooTasks[2]));
// pool has capacity for all
poolCapacity.mockReturnValue({
availableCapacity: 10,
});
pool.getUsedCapacityByType.mockReturnValue(0);
lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));
expect(pool.run).toHaveBeenCalledTimes(1);
const taskRunners = pool.run.mock.calls[0][0];
expect(taskRunners).toHaveLength(4);
const asStrings = taskRunners.map((taskRunner) => `${taskRunner}`);
expect(asStrings).toContain(`foo "${fooTasks[0].id}" (Ephemeral)`);
expect(asStrings).toContain(`report "${firstLimitedTask.id}" (Ephemeral)`);
expect(asStrings).toContain(`foo "${fooTasks[1].id}" (Ephemeral)`);
expect(asStrings).toContain(`foo "${fooTasks[2].id}" (Ephemeral)`);
});
test('properly removes from the queue after pulled', () => {
const { poolCapacity, opts, lifecycleEvent$ } = initTaskLifecycleParams();
const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts);
const tasks = [
taskManagerMock.createTask(),
taskManagerMock.createTask(),
taskManagerMock.createTask(),
];
expect(ephemeralTaskLifecycle.attemptToRun(tasks[0])).toMatchObject(asOk(tasks[0]));
expect(ephemeralTaskLifecycle.attemptToRun(tasks[1])).toMatchObject(asOk(tasks[1]));
expect(ephemeralTaskLifecycle.attemptToRun(tasks[2])).toMatchObject(asOk(tasks[2]));
expect(ephemeralTaskLifecycle.queuedTasks).toBe(3);
poolCapacity.mockReturnValue({
availableCapacity: 1,
});
lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));
expect(ephemeralTaskLifecycle.queuedTasks).toBe(2);
poolCapacity.mockReturnValue({
availableCapacity: 1,
});
lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));
expect(ephemeralTaskLifecycle.queuedTasks).toBe(1);
poolCapacity.mockReturnValue({
availableCapacity: 1,
});
lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));
expect(ephemeralTaskLifecycle.queuedTasks).toBe(0);
});
});

View file

@ -1,210 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Subject, Observable, Subscription } from 'rxjs';
import { filter } from 'rxjs';
import { Logger, ExecutionContextStart } from '@kbn/core/server';
import { Result, asErr, asOk } from './lib/result_type';
import { TaskManagerConfig } from './config';
import { asTaskManagerStatEvent, isTaskRunEvent, isTaskPollingCycleEvent } from './task_events';
import { Middleware } from './lib/middleware';
import { EphemeralTaskInstance } from './task';
import { TaskTypeDictionary } from './task_type_dictionary';
import { TaskLifecycleEvent } from './polling_lifecycle';
import { EphemeralTaskManagerRunner } from './task_running/ephemeral_task_runner';
import { TaskPool } from './task_pool';
export interface EphemeralTaskLifecycleOpts {
logger: Logger;
definitions: TaskTypeDictionary;
config: TaskManagerConfig;
middleware: Middleware;
elasticsearchAndSOAvailability$: Observable<boolean>;
pool: TaskPool;
lifecycleEvent: Observable<TaskLifecycleEvent>;
executionContext: ExecutionContextStart;
}
export type EphemeralTaskInstanceRequest = Omit<EphemeralTaskInstance, 'startedAt'>;
export class EphemeralTaskLifecycle {
private definitions: TaskTypeDictionary;
private pool: TaskPool;
private lifecycleEvent: Observable<TaskLifecycleEvent>;
// all task related events (task claimed, task marked as running, etc.) are emitted through events$
private events$ = new Subject<TaskLifecycleEvent>();
private ephemeralTaskQueue: Array<{
task: EphemeralTaskInstanceRequest;
enqueuedAt: number;
}> = [];
private logger: Logger;
private config: TaskManagerConfig;
private middleware: Middleware;
private lifecycleSubscription: Subscription = Subscription.EMPTY;
private readonly executionContext: ExecutionContextStart;
constructor({
logger,
middleware,
definitions,
pool,
lifecycleEvent,
config,
executionContext,
}: EphemeralTaskLifecycleOpts) {
this.logger = logger;
this.middleware = middleware;
this.definitions = definitions;
this.pool = pool;
this.lifecycleEvent = lifecycleEvent;
this.config = config;
this.executionContext = executionContext;
if (this.enabled) {
this.lifecycleSubscription = this.lifecycleEvent
.pipe(
filter((e) => {
const hasPollingCycleCompleted = isTaskPollingCycleEvent(e);
if (hasPollingCycleCompleted) {
this.emitEvent(
asTaskManagerStatEvent('queuedEphemeralTasks', asOk(this.queuedTasks))
);
}
return (
// when a polling cycle or a task run have just completed
(hasPollingCycleCompleted || isTaskRunEvent(e)) &&
// we want to know when the queue has ephemeral task run requests
this.queuedTasks > 0 &&
this.getCapacity() > 0
);
})
)
.subscribe((e) => {
let overallCapacity = this.getCapacity();
const capacityByType = new Map<string, number>();
const tasksWithinCapacity = [...this.ephemeralTaskQueue]
.filter(({ task }) => {
if (overallCapacity > 0) {
if (!capacityByType.has(task.taskType)) {
capacityByType.set(task.taskType, this.getCapacity(task.taskType));
}
if (capacityByType.get(task.taskType)! > 0) {
overallCapacity--;
capacityByType.set(task.taskType, capacityByType.get(task.taskType)! - 1);
return true;
}
}
})
.map((ephemeralTask) => {
const index = this.ephemeralTaskQueue.indexOf(ephemeralTask);
if (index >= 0) {
this.ephemeralTaskQueue.splice(index, 1);
}
this.emitEvent(
asTaskManagerStatEvent(
'ephemeralTaskDelay',
asOk(Date.now() - ephemeralTask.enqueuedAt)
)
);
return this.createTaskRunnerForTask(ephemeralTask.task);
});
if (tasksWithinCapacity.length) {
this.pool
.run(tasksWithinCapacity)
.then((successTaskPoolRunResult) => {
this.logger.debug(
`Successful ephemeral task lifecycle resulted in: ${successTaskPoolRunResult}`
);
})
.catch((error) => {
this.logger.debug(`Failed ephemeral task lifecycle resulted in: ${error}`);
});
}
});
}
}
public get enabled(): boolean {
return this.config.ephemeral_tasks.enabled;
}
public get events(): Observable<TaskLifecycleEvent> {
return this.events$;
}
private getCapacity = (taskType?: string) =>
taskType && this.definitions.get(taskType)?.maxConcurrency
? Math.max(
Math.min(
this.pool.availableCapacity(),
this.definitions.get(taskType)!.maxConcurrency! -
this.pool.getUsedCapacityByType(taskType)
),
0
)
: this.pool.availableCapacity();
private emitEvent = (event: TaskLifecycleEvent) => {
this.events$.next(event);
};
public attemptToRun(task: EphemeralTaskInstanceRequest) {
if (this.lifecycleSubscription.closed) {
return asErr(task);
}
return pushIntoSetWithTimestamp(
this.ephemeralTaskQueue,
this.config.ephemeral_tasks.request_capacity,
task
);
}
public get queuedTasks() {
return this.ephemeralTaskQueue.length;
}
private createTaskRunnerForTask = (
instance: EphemeralTaskInstanceRequest
): EphemeralTaskManagerRunner => {
return new EphemeralTaskManagerRunner({
logger: this.logger,
instance: {
...instance,
startedAt: new Date(),
},
definitions: this.definitions,
beforeRun: this.middleware.beforeRun,
beforeMarkRunning: this.middleware.beforeMarkRunning,
onTaskEvent: this.emitEvent,
executionContext: this.executionContext,
});
};
}
/**
* Pushes values into a bounded set
* @param set A Set of generic type T
* @param maxCapacity How many values are we allowed to push into the set
* @param value A value T to push into the set if it is there
*/
function pushIntoSetWithTimestamp(
set: Array<{
task: EphemeralTaskInstanceRequest;
enqueuedAt: number;
}>,
maxCapacity: number,
task: EphemeralTaskInstanceRequest
): Result<EphemeralTaskInstanceRequest, EphemeralTaskInstanceRequest> {
if (set.length >= maxCapacity) {
return asErr(task);
}
set.push({ task, enqueuedAt: Date.now() });
return asOk(task);
}

View file

@ -17,7 +17,6 @@ export const plugin = async (initContext: PluginInitializerContext) => {
export type {
TaskInstance,
ConcreteTaskInstance,
EphemeralTask,
TaskRunCreatorFunction,
RunContext,
IntervalSchedule,
@ -32,7 +31,6 @@ export {
isUnrecoverableError,
throwUnrecoverableError,
throwRetryableError,
isEphemeralTaskRejectedDueToCapacityError,
createTaskRunError,
TaskErrorSource,
} from './task_running';
@ -57,14 +55,6 @@ export const config: PluginConfigDescriptor<TaskManagerConfig> = {
schema: configSchema,
deprecations: ({ deprecate }) => {
return [
deprecate('ephemeral_tasks.enabled', 'a future version', {
level: 'warning',
message: `Configuring "xpack.task_manager.ephemeral_tasks.enabled" is deprecated and will be removed in a future version. Remove this setting to increase task execution resiliency.`,
}),
deprecate('ephemeral_tasks.request_capacity', 'a future version', {
level: 'warning',
message: `Configuring "xpack.task_manager.ephemeral_tasks.request_capacity" is deprecated and will be removed in a future version. Remove this setting to increase task execution resiliency.`,
}),
deprecate('max_workers', 'a future version', {
level: 'warning',
message: `Configuring "xpack.task_manager.max_workers" is deprecated and will be removed in a future version. Remove this setting and use "xpack.task_manager.capacity" instead.`,

View file

@ -70,10 +70,6 @@ describe('managed configuration', () => {
},
custom: {},
},
ephemeral_tasks: {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
authenticate_background_task_utilization: true,
@ -220,10 +216,6 @@ describe('managed configuration', () => {
},
custom: {},
},
ephemeral_tasks: {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
authenticate_background_task_utilization: true,
@ -346,10 +338,6 @@ describe('managed configuration', () => {
},
custom: {},
},
ephemeral_tasks: {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
authenticate_background_task_utilization: true,

View file

@ -42,10 +42,6 @@ const config = {
},
custom: {},
},
ephemeral_tasks: {
enabled: false,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
authenticate_background_task_utilization: true,
@ -195,7 +191,6 @@ const getStatsWithTimestamp = ({
persistence: {
recurring: 95,
non_recurring: 5,
ephemeral: 0,
},
result_frequency_percent_as_number: {
taskType1: {

View file

@ -507,7 +507,6 @@ function getMockMonitoredHealth(overrides = {}): MonitoredHealth {
persistence: {
[TaskPersistence.Recurring]: 10,
[TaskPersistence.NonRecurring]: 10,
[TaskPersistence.Ephemeral]: 10,
},
result_frequency_percent_as_number: {},
},

View file

@ -41,10 +41,6 @@ const config: TaskManagerConfig = {
},
kibanas_per_partition: 2,
allow_reading_invalid_state: false,
ephemeral_tasks: {
enabled: true,
request_capacity: 10,
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,

View file

@ -27,10 +27,8 @@ const createStartMock = () => {
bulkRemove: jest.fn(),
schedule: jest.fn(),
runSoon: jest.fn(),
ephemeralRunNow: jest.fn(),
ensureScheduled: jest.fn(),
removeIfExists: jest.fn().mockResolvedValue(Promise.resolve()), // it's a promise and there are some places where it's followed by `.catch()`
supportsEphemeralTasks: jest.fn(),
bulkUpdateSchedules: jest.fn(),
bulkSchedule: jest.fn(),
bulkDisable: jest.fn(),

View file

@ -35,12 +35,6 @@ describe('estimateCapacity', () => {
execution: {
duration: {},
duration_by_persistence: {
ephemeral: {
p50: 400,
p90: 500,
p95: 1200,
p99: 1500,
},
non_recurring: {
p50: 400,
p90: 500,
@ -56,7 +50,6 @@ describe('estimateCapacity', () => {
},
// no non-recurring executions in the system in recent history
persistence: {
ephemeral: 0,
non_recurring: 0,
recurring: 100,
},
@ -92,12 +85,6 @@ describe('estimateCapacity', () => {
execution: {
duration: {},
duration_by_persistence: {
ephemeral: {
p50: 2400,
p90: 2500,
p95: 3200,
p99: 3500,
},
non_recurring: {
p50: 1400,
p90: 1500,
@ -113,7 +100,6 @@ describe('estimateCapacity', () => {
},
// no non-recurring executions in the system in recent history
persistence: {
ephemeral: 0,
non_recurring: 0,
recurring: 100,
},
@ -153,7 +139,6 @@ describe('estimateCapacity', () => {
duration_by_persistence: {},
// no non-recurring executions in the system in recent history
persistence: {
ephemeral: 0,
non_recurring: 0,
recurring: 100,
},
@ -189,12 +174,6 @@ describe('estimateCapacity', () => {
execution: {
duration: {},
duration_by_persistence: {
ephemeral: {
p50: 400,
p90: 500,
p95: 1200,
p99: 1500,
},
non_recurring: {
p50: 400,
p90: 500,
@ -210,7 +189,6 @@ describe('estimateCapacity', () => {
},
// no non-recurring executions in the system in recent history
persistence: {
ephemeral: 0,
non_recurring: 0,
recurring: 100,
},
@ -247,12 +225,6 @@ describe('estimateCapacity', () => {
execution: {
duration: {},
duration_by_persistence: {
ephemeral: {
p50: 400,
p90: 500,
p95: 1200,
p99: 1500,
},
non_recurring: {
p50: 400,
p90: 500,
@ -268,7 +240,6 @@ describe('estimateCapacity', () => {
},
// no non-recurring executions in the system in recent history
persistence: {
ephemeral: 0,
non_recurring: 0,
recurring: 100,
},
@ -304,12 +275,6 @@ describe('estimateCapacity', () => {
execution: {
duration: {},
duration_by_persistence: {
ephemeral: {
p50: 400,
p90: 500,
p95: 1200,
p99: 1500,
},
non_recurring: {
p50: 400,
p90: 500,
@ -325,7 +290,6 @@ describe('estimateCapacity', () => {
},
// no non-recurring executions in the system in recent history
persistence: {
ephemeral: 0,
non_recurring: 0,
recurring: 100,
},
@ -374,12 +338,6 @@ describe('estimateCapacity', () => {
execution: {
duration: {},
duration_by_persistence: {
ephemeral: {
p50: 400,
p90: 500,
p95: 1200,
p99: 1500,
},
non_recurring: {
p50: 400,
p90: 500,
@ -394,8 +352,6 @@ describe('estimateCapacity', () => {
},
},
persistence: {
// 50% of tasks are non-recurring/ephemeral executions in the system in recent history
ephemeral: 25,
non_recurring: 25,
recurring: 50,
},
@ -418,7 +374,7 @@ describe('estimateCapacity', () => {
});
});
test('estimates the min required kibana instances when there is sufficient capacity for recurring but not for non-recurring/ephemeral', async () => {
test('estimates the min required kibana instances when there is sufficient capacity for recurring but not for non-recurring', async () => {
const provisionedKibanaInstances = 2;
const recurringTasksPerMinute = 251;
// 50% for non-recurring/epehemral + half of recurring task workload
@ -456,12 +412,6 @@ describe('estimateCapacity', () => {
execution: {
duration: {},
duration_by_persistence: {
ephemeral: {
p50: 400,
p90: 500,
p95: 1200,
p99: 1500,
},
non_recurring: {
p50: 400,
p90: 500,
@ -476,8 +426,6 @@ describe('estimateCapacity', () => {
},
},
persistence: {
// 50% of tasks are non-recurring/ephemeral executions in the system in recent history
ephemeral: 25,
non_recurring: 25,
recurring: 50,
},
@ -541,12 +489,6 @@ describe('estimateCapacity', () => {
execution: {
duration: {},
duration_by_persistence: {
ephemeral: {
p50: 400,
p90: 500,
p95: 1200,
p99: 1500,
},
non_recurring: {
p50: 400,
p90: 500,
@ -562,7 +504,6 @@ describe('estimateCapacity', () => {
},
// 20% average of non-recurring executions in the system in recent history
persistence: {
ephemeral: 0,
non_recurring: 20,
recurring: 80,
},
@ -607,12 +548,6 @@ describe('estimateCapacity', () => {
execution: {
duration: {},
duration_by_persistence: {
ephemeral: {
p50: 400,
p90: 500,
p95: 1200,
p99: 1500,
},
non_recurring: {
p50: 400,
p90: 500,
@ -628,7 +563,6 @@ describe('estimateCapacity', () => {
},
// no non-recurring executions in the system in recent history
persistence: {
ephemeral: 0,
non_recurring: 20,
recurring: 80,
},
@ -673,12 +607,6 @@ describe('estimateCapacity', () => {
execution: {
duration: {},
duration_by_persistence: {
ephemeral: {
p50: 400,
p90: 500,
p95: 1200,
p99: 1500,
},
non_recurring: {
p50: 400,
p90: 500,
@ -694,7 +622,6 @@ describe('estimateCapacity', () => {
},
// 20% average of non-recurring executions in the system in recent history
persistence: {
ephemeral: 0,
non_recurring: 20,
recurring: 80,
},
@ -739,12 +666,6 @@ describe('estimateCapacity', () => {
execution: {
duration: {},
duration_by_persistence: {
ephemeral: {
p50: 400,
p90: 500,
p95: 1200,
p99: 1500,
},
non_recurring: {
p50: 400,
p90: 500,
@ -761,7 +682,6 @@ describe('estimateCapacity', () => {
persistence: {
recurring: 0,
non_recurring: 70,
ephemeral: 30,
},
result_frequency_percent_as_number: {},
},
@ -776,7 +696,7 @@ describe('estimateCapacity', () => {
observed: {
observed_kibana_instances: 1,
avg_recurring_required_throughput_per_minute: 29,
// we obesrve 100% capacity on non-recurring/ephemeral tasks, which is 200tpm
// we obesrve 100% capacity on non-recurring tasks, which is 200tpm
// and add to that the 29tpm for recurring tasks
avg_required_throughput_per_minute_per_kibana: 229,
},
@ -816,12 +736,6 @@ describe('estimateCapacity', () => {
execution: {
duration: {},
duration_by_persistence: {
ephemeral: {
p50: 400,
p90: 500,
p95: 1200,
p99: 1500,
},
non_recurring: {
p50: 400,
p90: 500,
@ -838,7 +752,6 @@ describe('estimateCapacity', () => {
persistence: {
recurring: 0,
non_recurring: 70,
ephemeral: 30,
},
result_frequency_percent_as_number: {},
},
@ -853,12 +766,12 @@ describe('estimateCapacity', () => {
observed: {
observed_kibana_instances: 1,
avg_recurring_required_throughput_per_minute: 210,
// we obesrve 100% capacity on non-recurring/ephemeral tasks, which is 200tpm
// we obesrve 100% capacity on non-recurring tasks, which is 200tpm
// and add to that the 210tpm for recurring tasks
avg_required_throughput_per_minute_per_kibana: 410,
},
proposed: {
// we propose provisioning 3 instances for recurring + non-recurring/ephemeral
// we propose provisioning 3 instances for recurring + non-recurring
provisioned_kibana: 3,
// but need at least 2 for recurring
min_required_kibana: 2,
@ -890,12 +803,6 @@ describe('estimateCapacity', () => {
execution: {
duration: {},
duration_by_persistence: {
ephemeral: {
p50: 400,
p90: 500,
p95: 1200,
p99: 1500,
},
non_recurring: {
p50: 400,
p90: 500,
@ -911,7 +818,6 @@ describe('estimateCapacity', () => {
},
// no non-recurring executions in the system in recent history
persistence: {
ephemeral: 0,
non_recurring: 0,
recurring: 100,
},
@ -935,30 +841,6 @@ function mockStats(
runtime: Partial<Required<RawMonitoringStats['stats']>['runtime']['value']> = {}
): CapacityEstimationParams {
return {
ephemeral: {
status: HealthStatus.OK,
timestamp: new Date().toISOString(),
value: {
load: {
p50: 4,
p90: 6,
p95: 6,
p99: 6,
},
executionsPerCycle: {
p50: 4,
p90: 6,
p95: 6,
p99: 6,
},
queuedTasks: {
p50: 4,
p90: 6,
p95: 6,
p99: 6,
},
},
},
configuration: {
status: HealthStatus.OK,
timestamp: new Date().toISOString(),
@ -1026,12 +908,6 @@ function mockStats(
execution: {
duration: {},
duration_by_persistence: {
ephemeral: {
p50: 400,
p90: 500,
p95: 1200,
p99: 1500,
},
non_recurring: {
p50: 400,
p90: 500,
@ -1046,7 +922,6 @@ function mockStats(
},
},
persistence: {
ephemeral: 0,
non_recurring: 30,
recurring: 70,
},

View file

@ -104,9 +104,9 @@ export function estimateCapacity(
/**
* On average, how much of this kibana's capacity has been historically used to execute
* non-recurring and ephemeral tasks
* non-recurring tasks
*/
const averageCapacityUsedByNonRecurringAndEphemeralTasksPerKibana = percentageOf(
const averageCapacityUsedByNonRecurringTasksPerKibana = percentageOf(
capacityPerMinutePerKibana,
percentageOf(averageLoadPercentage, 100 - percentageOfExecutionsUsedByRecurringTasks)
);
@ -116,14 +116,14 @@ export function estimateCapacity(
* for recurring tasks
*/
const averageCapacityAvailableForRecurringTasksPerKibana =
capacityPerMinutePerKibana - averageCapacityUsedByNonRecurringAndEphemeralTasksPerKibana;
capacityPerMinutePerKibana - averageCapacityUsedByNonRecurringTasksPerKibana;
/**
* At times a cluster might experience spikes of NonRecurring/Ephemeral tasks which swamp Task Manager
* causing it to spend all its capacity on NonRecurring/Ephemeral tasks, which makes it much harder
* At times a cluster might experience spikes of NonRecurring tasks which swamp Task Manager
* causing it to spend all its capacity on NonRecurring tasks, which makes it much harder
* to estimate the required capacity.
* This is easy to identify as load will usually max out or all the workers are busy executing non-recurring
* or ephemeral tasks, and none are running recurring tasks.
* tasks, and none are running recurring tasks.
*/
const hasTooLittleCapacityToEstimateRequiredNonRecurringCapacity =
averageLoadPercentage === 100 || averageCapacityAvailableForRecurringTasksPerKibana === 0;
@ -165,24 +165,24 @@ export function estimateCapacity(
averageRecurringRequiredPerMinute / minRequiredKibanaInstances;
/**
* assuming the historical capacity needed for ephemeral and non-recurring tasks, plus
* assuming the historical capacity needed for non-recurring tasks, plus
* the amount we know each kibana would need for recurring tasks, how much capacity would
* each kibana need if following the minRequiredKibanaInstances?
*/
const averageRequiredThroughputPerMinutePerKibana =
averageCapacityUsedByNonRecurringAndEphemeralTasksPerKibana *
averageCapacityUsedByNonRecurringTasksPerKibana *
(assumedKibanaInstances / minRequiredKibanaInstances) +
averageRecurringRequiredPerMinute / minRequiredKibanaInstances;
const assumedAverageRecurringRequiredThroughputPerMinutePerKibana =
averageRecurringRequiredPerMinute / assumedKibanaInstances;
/**
* assuming the historical capacity needed for ephemeral and non-recurring tasks, plus
* assuming the historical capacity needed for non-recurring tasks, plus
* the amount we know each kibana would need for recurring tasks, how much capacity would
* each kibana need if the assumed current number were correct?
*/
const assumedRequiredThroughputPerMinutePerKibana =
averageCapacityUsedByNonRecurringAndEphemeralTasksPerKibana +
averageCapacityUsedByNonRecurringTasksPerKibana +
averageRecurringRequiredPerMinute / assumedKibanaInstances;
const { status, reason } = getHealthStatus(logger, {
@ -281,11 +281,7 @@ function getAverageDuration(
durations: Partial<TaskPersistenceTypes<AveragedStat>>
): Result<number, number> {
const result = stats.mean(
[
durations.ephemeral?.p50 ?? 0,
durations.non_recurring?.p50 ?? 0,
durations.recurring?.p50 ?? 0,
].filter((val) => val > 0)
[durations.non_recurring?.p50 ?? 0, durations.recurring?.p50 ?? 0].filter((val) => val > 0)
);
if (isNaN(result)) {
return asErr(result);

View file

@ -38,10 +38,6 @@ describe('Configuration Statistics Aggregator', () => {
},
custom: {},
},
ephemeral_tasks: {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
authenticate_background_task_utilization: true,

View file

@ -1,384 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { v4 as uuidv4 } from 'uuid';
import { Subject, Observable } from 'rxjs';
import stats from 'stats-lite';
import { take, bufferCount, skip, map } from 'rxjs';
import { ConcreteTaskInstance, TaskStatus } from '../task';
import {
asTaskRunEvent,
TaskTiming,
asTaskManagerStatEvent,
TaskPersistence,
} from '../task_events';
import { asOk } from '../lib/result_type';
import { TaskLifecycleEvent } from '../polling_lifecycle';
import { TaskRunResult } from '../task_running';
import {
createEphemeralTaskAggregator,
summarizeEphemeralStat,
SummarizedEphemeralTaskStat,
EphemeralTaskStat,
} from './ephemeral_task_statistics';
import { AggregatedStat } from '../lib/runtime_statistics_aggregator';
import { ephemeralTaskLifecycleMock } from '../ephemeral_task_lifecycle.mock';
import { times, takeRight, take as takeLeft } from 'lodash';
describe('Ephemeral Task Statistics', () => {
test('returns the average size of the ephemeral queue', async () => {
const queueSize = [2, 6, 10, 10, 10, 6, 2, 0, 0];
const events$ = new Subject<TaskLifecycleEvent>();
const getQueuedTasks = jest.fn();
const ephemeralTaskLifecycle = ephemeralTaskLifecycleMock.create({
events$: events$ as Observable<TaskLifecycleEvent>,
getQueuedTasks,
});
const runningAverageWindowSize = 5;
const ephemeralTaskAggregator = createEphemeralTaskAggregator(
ephemeralTaskLifecycle,
runningAverageWindowSize,
10
);
function expectWindowEqualsUpdate(
taskStat: AggregatedStat<SummarizedEphemeralTaskStat>,
window: number[]
) {
expect(taskStat.value.queuedTasks).toMatchObject({
p50: stats.percentile(window, 0.5),
p90: stats.percentile(window, 0.9),
p95: stats.percentile(window, 0.95),
p99: stats.percentile(window, 0.99),
});
}
return new Promise<void>((resolve) => {
ephemeralTaskAggregator
.pipe(
// skip initial stat which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
// Use 'summarizeEphemeralStat' to receive summarize stats
map(({ key, value }: AggregatedStat<EphemeralTaskStat>) => ({
key,
value: summarizeEphemeralStat(value).value,
})),
take(queueSize.length),
bufferCount(queueSize.length)
)
.subscribe((taskStats: Array<AggregatedStat<SummarizedEphemeralTaskStat>>) => {
expectWindowEqualsUpdate(taskStats[0], queueSize.slice(0, 1));
expectWindowEqualsUpdate(taskStats[1], queueSize.slice(0, 2));
expectWindowEqualsUpdate(taskStats[2], queueSize.slice(0, 3));
expectWindowEqualsUpdate(taskStats[3], queueSize.slice(0, 4));
expectWindowEqualsUpdate(taskStats[4], queueSize.slice(0, 5));
// from the 6th value, begin to drop old values as out window is 5
expectWindowEqualsUpdate(taskStats[5], queueSize.slice(1, 6));
expectWindowEqualsUpdate(taskStats[6], queueSize.slice(2, 7));
expectWindowEqualsUpdate(taskStats[7], queueSize.slice(3, 8));
resolve();
});
for (const size of queueSize) {
events$.next(asTaskManagerStatEvent('queuedEphemeralTasks', asOk(size)));
}
});
});
test('returns the average number of ephemeral tasks executed per polling cycle', async () => {
const tasksQueueSize = [5, 2, 5, 0];
const executionsPerCycle = [5, 0, 5];
// we expect one event per "task queue size event", and we simmulate
// tasks being drained after each one of theseevents, so we expect
// the first cycle to show zero drained tasks
const expectedTasksDrainedEvents = [0, ...executionsPerCycle];
const events$ = new Subject<TaskLifecycleEvent>();
const getQueuedTasks = jest.fn();
const ephemeralTaskLifecycle = ephemeralTaskLifecycleMock.create({
events$: events$ as Observable<TaskLifecycleEvent>,
getQueuedTasks,
});
const runningAverageWindowSize = 5;
const ephemeralTaskAggregator = createEphemeralTaskAggregator(
ephemeralTaskLifecycle,
runningAverageWindowSize,
10
);
function expectWindowEqualsUpdate(
taskStat: AggregatedStat<SummarizedEphemeralTaskStat>,
window: number[]
) {
expect(taskStat.value.executionsPerCycle).toMatchObject({
p50: stats.percentile(window, 0.5),
p90: stats.percentile(window, 0.9),
p95: stats.percentile(window, 0.95),
p99: stats.percentile(window, 0.99),
});
}
return new Promise<void>((resolve) => {
ephemeralTaskAggregator
.pipe(
// skip initial stat which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
// Use 'summarizeEphemeralStat' to receive summarize stats
map(({ key, value }: AggregatedStat<EphemeralTaskStat>) => ({
key,
value: summarizeEphemeralStat(value).value,
})),
take(tasksQueueSize.length),
bufferCount(tasksQueueSize.length)
)
.subscribe((taskStats: Array<AggregatedStat<SummarizedEphemeralTaskStat>>) => {
taskStats.forEach((taskStat, index) => {
expectWindowEqualsUpdate(
taskStat,
takeRight(takeLeft(expectedTasksDrainedEvents, index + 1), runningAverageWindowSize)
);
});
resolve();
});
for (const tasksDrainedInCycle of executionsPerCycle) {
events$.next(
asTaskManagerStatEvent('queuedEphemeralTasks', asOk(tasksQueueSize.shift() ?? 0))
);
times(tasksDrainedInCycle, () => {
events$.next(mockTaskRunEvent());
});
}
events$.next(
asTaskManagerStatEvent('queuedEphemeralTasks', asOk(tasksQueueSize.shift() ?? 0))
);
});
});
test('returns the average load added per polling cycle cycle by ephemeral tasks', async () => {
const tasksExecuted = [0, 5, 10, 10, 10, 5, 5, 0, 0, 0, 0, 0];
const expectedLoad = [0, 50, 100, 100, 100, 50, 50, 0, 0, 0, 0, 0];
const events$ = new Subject<TaskLifecycleEvent>();
const getQueuedTasks = jest.fn();
const ephemeralTaskLifecycle = ephemeralTaskLifecycleMock.create({
events$: events$ as Observable<TaskLifecycleEvent>,
getQueuedTasks,
});
const runningAverageWindowSize = 5;
const capacity = 10;
const ephemeralTaskAggregator = createEphemeralTaskAggregator(
ephemeralTaskLifecycle,
runningAverageWindowSize,
capacity
);
function expectWindowEqualsUpdate(
taskStat: AggregatedStat<SummarizedEphemeralTaskStat>,
window: number[]
) {
expect(taskStat.value.load).toMatchObject({
p50: stats.percentile(window, 0.5),
p90: stats.percentile(window, 0.9),
p95: stats.percentile(window, 0.95),
p99: stats.percentile(window, 0.99),
});
}
return new Promise<void>((resolve) => {
ephemeralTaskAggregator
.pipe(
// skip initial stat which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
// Use 'summarizeEphemeralStat' to receive summarize stats
map(({ key, value }: AggregatedStat<EphemeralTaskStat>) => ({
key,
value: summarizeEphemeralStat(value).value,
})),
take(tasksExecuted.length),
bufferCount(tasksExecuted.length)
)
.subscribe((taskStats: Array<AggregatedStat<SummarizedEphemeralTaskStat>>) => {
taskStats.forEach((taskStat, index) => {
expectWindowEqualsUpdate(
taskStat,
takeRight(takeLeft(expectedLoad, index + 1), runningAverageWindowSize)
);
});
resolve();
});
for (const tasksExecutedInCycle of tasksExecuted) {
times(tasksExecutedInCycle, () => {
events$.next(mockTaskRunEvent());
});
events$.next(asTaskManagerStatEvent('queuedEphemeralTasks', asOk(0)));
}
});
});
});
test('returns the average load added per polling cycle cycle by ephemeral tasks when load exceeds capacity', async () => {
const tasksExecuted = [0, 5, 10, 20, 15, 10, 5, 0, 0, 0, 0, 0];
const expectedLoad = [0, 50, 100, 200, 150, 100, 50, 0, 0, 0, 0, 0];
const events$ = new Subject<TaskLifecycleEvent>();
const getQueuedTasks = jest.fn();
const ephemeralTaskLifecycle = ephemeralTaskLifecycleMock.create({
events$: events$ as Observable<TaskLifecycleEvent>,
getQueuedTasks,
});
const runningAverageWindowSize = 5;
const capacity = 10;
const ephemeralTaskAggregator = createEphemeralTaskAggregator(
ephemeralTaskLifecycle,
runningAverageWindowSize,
capacity
);
function expectWindowEqualsUpdate(
taskStat: AggregatedStat<SummarizedEphemeralTaskStat>,
window: number[]
) {
expect(taskStat.value.load).toMatchObject({
p50: stats.percentile(window, 0.5),
p90: stats.percentile(window, 0.9),
p95: stats.percentile(window, 0.95),
p99: stats.percentile(window, 0.99),
});
}
return new Promise<void>((resolve) => {
ephemeralTaskAggregator
.pipe(
// skip initial stat which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
// Use 'summarizeEphemeralStat' to receive summarize stats
map(({ key, value }: AggregatedStat<EphemeralTaskStat>) => ({
key,
value: summarizeEphemeralStat(value).value,
})),
take(tasksExecuted.length),
bufferCount(tasksExecuted.length)
)
.subscribe((taskStats: Array<AggregatedStat<SummarizedEphemeralTaskStat>>) => {
taskStats.forEach((taskStat, index) => {
expectWindowEqualsUpdate(
taskStat,
takeRight(takeLeft(expectedLoad, index + 1), runningAverageWindowSize)
);
});
resolve();
});
for (const tasksExecutedInCycle of tasksExecuted) {
times(tasksExecutedInCycle, () => {
events$.next(mockTaskRunEvent());
});
events$.next(asTaskManagerStatEvent('queuedEphemeralTasks', asOk(0)));
}
});
});
test('returns the average delay experienced by tasks in the ephemeral queue', async () => {
const taskDelays = [100, 150, 500, 100, 100, 200, 2000, 10000, 20000, 100];
const events$ = new Subject<TaskLifecycleEvent>();
const getQueuedTasks = jest.fn();
const ephemeralTaskLifecycle = ephemeralTaskLifecycleMock.create({
events$: events$ as Observable<TaskLifecycleEvent>,
getQueuedTasks,
});
const runningAverageWindowSize = 5;
const ephemeralTaskAggregator = createEphemeralTaskAggregator(
ephemeralTaskLifecycle,
runningAverageWindowSize,
10
);
function expectWindowEqualsUpdate(
taskStat: AggregatedStat<SummarizedEphemeralTaskStat>,
window: number[]
) {
expect(taskStat.value.delay).toMatchObject({
p50: stats.percentile(window, 0.5),
p90: stats.percentile(window, 0.9),
p95: stats.percentile(window, 0.95),
p99: stats.percentile(window, 0.99),
});
}
return new Promise<void>((resolve) => {
ephemeralTaskAggregator
.pipe(
// skip initial stat which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
// Use 'summarizeEphemeralStat' to receive summarize stats
map(({ key, value }: AggregatedStat<EphemeralTaskStat>) => ({
key,
value: summarizeEphemeralStat(value).value,
})),
take(taskDelays.length),
bufferCount(taskDelays.length)
)
.subscribe((taskStats: Array<AggregatedStat<SummarizedEphemeralTaskStat>>) => {
taskStats.forEach((taskStat, index) => {
expectWindowEqualsUpdate(
taskStat,
takeRight(takeLeft(taskDelays, index + 1), runningAverageWindowSize)
);
});
resolve();
});
for (const delay of taskDelays) {
events$.next(asTaskManagerStatEvent('ephemeralTaskDelay', asOk(delay)));
}
});
});
const mockTaskRunEvent = (
overrides: Partial<ConcreteTaskInstance> = {},
timing: TaskTiming = {
start: 0,
stop: 0,
},
result: TaskRunResult = TaskRunResult.Success
) => {
const task = mockTaskInstance(overrides);
const persistence = TaskPersistence.Recurring;
return asTaskRunEvent(task.id, asOk({ task, persistence, result, isExpired: false }), timing);
};
const mockTaskInstance = (overrides: Partial<ConcreteTaskInstance> = {}): ConcreteTaskInstance => ({
id: uuidv4(),
attempts: 0,
status: TaskStatus.Running,
version: '123',
runAt: new Date(),
scheduledAt: new Date(),
startedAt: new Date(),
retryAt: new Date(Date.now() + 5 * 60 * 1000),
state: {},
taskType: 'alerting:test',
params: {
alertId: '1',
},
ownerId: null,
...overrides,
});

View file

@ -1,127 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { map, filter, startWith, buffer, share } from 'rxjs';
import { JsonObject } from '@kbn/utility-types';
import { combineLatest, Observable, zip } from 'rxjs';
import { isOk, Ok } from '../lib/result_type';
import { AggregatedStat, AggregatedStatProvider } from '../lib/runtime_statistics_aggregator';
import { EphemeralTaskLifecycle } from '../ephemeral_task_lifecycle';
import { TaskLifecycleEvent } from '../polling_lifecycle';
import { isTaskRunEvent, isTaskManagerStatEvent } from '../task_events';
import {
AveragedStat,
calculateRunningAverage,
createRunningAveragedStat,
} from './task_run_calculators';
import { HealthStatus } from './monitoring_stats_stream';
export interface EphemeralTaskStat extends JsonObject {
queuedTasks: number[];
executionsPerCycle: number[];
load: number[];
delay: number[];
}
export interface SummarizedEphemeralTaskStat extends JsonObject {
queuedTasks: AveragedStat;
executionsPerCycle: AveragedStat;
load: AveragedStat;
}
export function createEphemeralTaskAggregator(
ephemeralTaskLifecycle: EphemeralTaskLifecycle,
runningAverageWindowSize: number,
capacity: number
): AggregatedStatProvider<EphemeralTaskStat> {
const ephemeralTaskRunEvents$ = ephemeralTaskLifecycle.events.pipe(
filter((taskEvent: TaskLifecycleEvent) => isTaskRunEvent(taskEvent))
);
const ephemeralQueueSizeEvents$: Observable<number> = ephemeralTaskLifecycle.events.pipe(
filter(
(taskEvent: TaskLifecycleEvent) =>
isTaskManagerStatEvent(taskEvent) &&
taskEvent.id === 'queuedEphemeralTasks' &&
isOk<number, never>(taskEvent.event)
),
map<TaskLifecycleEvent, number>((taskEvent: TaskLifecycleEvent) => {
return (taskEvent.event as unknown as Ok<number>).value;
}),
// as we consume this stream twice below (in the buffer, and the zip)
// we want to use share, otherwise ther'll be 2 subscribers and both will emit event
share()
);
const ephemeralQueueExecutionsPerCycleQueue =
createRunningAveragedStat<number>(runningAverageWindowSize);
const ephemeralQueuedTasksQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const ephemeralTaskLoadQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const ephemeralPollingCycleBasedStats$ = zip(
ephemeralTaskRunEvents$.pipe(
buffer(ephemeralQueueSizeEvents$),
map((taskEvents: TaskLifecycleEvent[]) => taskEvents.length)
),
ephemeralQueueSizeEvents$
).pipe(
map(([tasksRanSincePreviousQueueSize, ephemeralQueueSize]) => ({
queuedTasks: ephemeralQueuedTasksQueue(ephemeralQueueSize),
executionsPerCycle: ephemeralQueueExecutionsPerCycleQueue(tasksRanSincePreviousQueueSize),
load: ephemeralTaskLoadQueue(calculateWorkerLoad(capacity, tasksRanSincePreviousQueueSize)),
})),
startWith({
queuedTasks: [],
executionsPerCycle: [],
load: [],
})
);
const ephemeralTaskDelayQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const ephemeralTaskDelayEvents$: Observable<number[]> = ephemeralTaskLifecycle.events.pipe(
filter(
(taskEvent: TaskLifecycleEvent) =>
isTaskManagerStatEvent(taskEvent) &&
taskEvent.id === 'ephemeralTaskDelay' &&
isOk<number, never>(taskEvent.event)
),
map<TaskLifecycleEvent, number[]>((taskEvent: TaskLifecycleEvent) => {
return ephemeralTaskDelayQueue((taskEvent.event as unknown as Ok<number>).value);
}),
startWith([])
);
return combineLatest([ephemeralPollingCycleBasedStats$, ephemeralTaskDelayEvents$]).pipe(
map(([stats, delay]: [Omit<EphemeralTaskStat, 'delay'>, EphemeralTaskStat['delay']]) => {
return {
key: 'ephemeral',
value: { ...stats, delay },
} as AggregatedStat<EphemeralTaskStat>;
})
);
}
function calculateWorkerLoad(maxWorkers: number, tasksExecuted: number) {
return Math.round((tasksExecuted * 100) / maxWorkers);
}
export function summarizeEphemeralStat({
queuedTasks,
executionsPerCycle,
load,
delay,
}: EphemeralTaskStat): { value: SummarizedEphemeralTaskStat; status: HealthStatus } {
return {
value: {
queuedTasks: calculateRunningAverage(queuedTasks.length ? queuedTasks : [0]),
load: calculateRunningAverage(load.length ? load : [0]),
executionsPerCycle: calculateRunningAverage(
executionsPerCycle.length ? executionsPerCycle : [0]
),
delay: calculateRunningAverage(delay.length ? delay : [0]),
},
status: HealthStatus.OK,
};
}

View file

@ -16,7 +16,6 @@ import {
import { TaskStore } from '../task_store';
import { TaskPollingLifecycle } from '../polling_lifecycle';
import { ManagedConfiguration } from '../lib/create_managed_configuration';
import { EphemeralTaskLifecycle } from '../ephemeral_task_lifecycle';
import { AdHocTaskCounter } from '../lib/adhoc_task_counter';
import { TaskTypeDictionary } from '../task_type_dictionary';
@ -37,7 +36,6 @@ export interface CreateMonitoringStatsOpts {
adHocTaskCounter: AdHocTaskCounter;
taskDefinitions: TaskTypeDictionary;
taskPollingLifecycle?: TaskPollingLifecycle;
ephemeralTaskLifecycle?: EphemeralTaskLifecycle;
}
export function createMonitoringStats(

View file

@ -16,12 +16,6 @@ import {
SummarizedWorkloadStat,
WorkloadStat,
} from './workload_statistics';
import {
EphemeralTaskStat,
createEphemeralTaskAggregator,
SummarizedEphemeralTaskStat,
summarizeEphemeralStat,
} from './ephemeral_task_statistics';
import {
createTaskRunAggregator,
summarizeTaskRunStat,
@ -45,7 +39,6 @@ export interface MonitoringStats {
configuration?: MonitoredStat<ConfigStat>;
workload?: MonitoredStat<WorkloadStat>;
runtime?: MonitoredStat<TaskRunStat>;
ephemeral?: MonitoredStat<EphemeralTaskStat>;
utilization?: MonitoredStat<BackgroundTaskUtilizationStat>;
};
}
@ -72,7 +65,6 @@ export interface RawMonitoringStats {
configuration?: RawMonitoredStat<ConfigStat>;
workload?: RawMonitoredStat<SummarizedWorkloadStat>;
runtime?: RawMonitoredStat<SummarizedTaskRunStat>;
ephemeral?: RawMonitoredStat<SummarizedEphemeralTaskStat>;
capacity_estimation?: RawMonitoredStat<CapacityEstimationStat>;
};
}
@ -86,7 +78,6 @@ export function createAggregators({
taskDefinitions,
adHocTaskCounter,
taskPollingLifecycle,
ephemeralTaskLifecycle,
}: CreateMonitoringStatsOpts): AggregatedStatProvider {
const aggregators: AggregatedStatProvider[] = [
createConfigurationAggregator(config, managedConfig),
@ -111,15 +102,6 @@ export function createAggregators({
)
);
}
if (ephemeralTaskLifecycle && ephemeralTaskLifecycle.enabled) {
aggregators.push(
createEphemeralTaskAggregator(
ephemeralTaskLifecycle,
config.monitored_stats_running_average_window,
managedConfig.startingCapacity
)
);
}
return merge(...aggregators);
}
@ -156,7 +138,7 @@ export function summarizeMonitoringStats(
{
// eslint-disable-next-line @typescript-eslint/naming-convention
last_update,
stats: { runtime, workload, configuration, ephemeral, utilization },
stats: { runtime, workload, configuration, utilization },
}: MonitoringStats,
config: TaskManagerConfig,
assumedKibanaInstances: number
@ -188,14 +170,6 @@ export function summarizeMonitoringStats(
},
}
: {}),
...(ephemeral
? {
ephemeral: {
timestamp: ephemeral.timestamp,
...summarizeEphemeralStat(ephemeral.value),
},
}
: {}),
},
assumedKibanaInstances
);

View file

@ -455,10 +455,7 @@ describe('Task Run Statistics', () => {
{ start: 0, stop: 0 },
TaskRunResult.Success
),
mockTaskRunEvent({}, { start: 0, stop: 0 }, TaskRunResult.Success, TaskPersistence.Ephemeral),
mockTaskRunEvent({}, { start: 0, stop: 0 }, TaskRunResult.Success, TaskPersistence.Ephemeral),
mockTaskRunEvent({}, { start: 0, stop: 0 }, TaskRunResult.Success),
mockTaskRunEvent({}, { start: 0, stop: 0 }, TaskRunResult.Success, TaskPersistence.Ephemeral),
mockTaskRunEvent(
{ schedule: { interval: '3s' } },
{ start: 0, stop: 0 },
@ -490,79 +487,52 @@ describe('Task Run Statistics', () => {
.toMatchInlineSnapshot(`
Array [
Object {
"ephemeral": 0,
"non_recurring": 100,
"recurring": 0,
},
Object {
"ephemeral": 0,
"non_recurring": 100,
"recurring": 0,
},
Object {
"ephemeral": 0,
"non_recurring": 67,
"recurring": 33,
},
Object {
"ephemeral": 0,
"non_recurring": 75,
"recurring": 25,
},
Object {
"ephemeral": 0,
"non_recurring": 80,
"recurring": 20,
},
Object {
"ephemeral": 0,
"non_recurring": 60,
"recurring": 40,
},
Object {
"ephemeral": 0,
"non_recurring": 40,
"recurring": 60,
},
Object {
"ephemeral": 0,
"non_recurring": 60,
"recurring": 40,
},
Object {
"ephemeral": 0,
"non_recurring": 60,
"recurring": 40,
},
Object {
"ephemeral": 0,
"non_recurring": 40,
"recurring": 60,
},
Object {
"ephemeral": 20,
"non_recurring": 40,
"non_recurring": 60,
"recurring": 40,
},
Object {
"ephemeral": 40,
"non_recurring": 40,
"recurring": 20,
},
Object {
"ephemeral": 40,
"non_recurring": 40,
"recurring": 20,
},
Object {
"ephemeral": 60,
"non_recurring": 20,
"recurring": 20,
},
Object {
"ephemeral": 60,
"non_recurring": 20,
"recurring": 20,
"non_recurring": 60,
"recurring": 40,
},
]
`);

View file

@ -92,7 +92,6 @@ interface ResultFrequency extends JsonObject {
export interface TaskPersistenceTypes<T extends JsonValue = number> extends JsonObject {
[TaskPersistence.Recurring]: T;
[TaskPersistence.NonRecurring]: T;
[TaskPersistence.Ephemeral]: T;
}
type ResultFrequencySummary = ResultFrequency & {
@ -247,7 +246,6 @@ export function createTaskRunAggregator(
duration_by_persistence: {
[TaskPersistence.Recurring]: [],
[TaskPersistence.NonRecurring]: [],
[TaskPersistence.Ephemeral]: [],
},
result_frequency_percent_as_number: {},
persistence: [],
@ -401,7 +399,6 @@ export function summarizeTaskRunStat(
persistence: {
[TaskPersistence.Recurring]: 0,
[TaskPersistence.NonRecurring]: 0,
[TaskPersistence.Ephemeral]: 0,
...calculateFrequency<TaskPersistence>(persistence),
},
result_frequency_percent_as_number: mapValues(

View file

@ -17,9 +17,6 @@ import { cloudMock } from '@kbn/cloud-plugin/public/mocks';
import { taskPollingLifecycleMock } from './polling_lifecycle.mock';
import { TaskPollingLifecycle } from './polling_lifecycle';
import type { TaskPollingLifecycle as TaskPollingLifecycleClass } from './polling_lifecycle';
import { ephemeralTaskLifecycleMock } from './ephemeral_task_lifecycle.mock';
import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle';
import type { EphemeralTaskLifecycle as EphemeralTaskLifecycleClass } from './ephemeral_task_lifecycle';
let mockTaskPollingLifecycle = taskPollingLifecycleMock.create({});
jest.mock('./polling_lifecycle', () => {
@ -30,15 +27,6 @@ jest.mock('./polling_lifecycle', () => {
};
});
let mockEphemeralTaskLifecycle = ephemeralTaskLifecycleMock.create({});
jest.mock('./ephemeral_task_lifecycle', () => {
return {
EphemeralTaskLifecycle: jest.fn().mockImplementation(() => {
return mockEphemeralTaskLifecycle;
}),
};
});
const deleteCurrentNodeSpy = jest.spyOn(KibanaDiscoveryService.prototype, 'deleteCurrentNode');
const discoveryIsStarted = jest.spyOn(KibanaDiscoveryService.prototype, 'isStarted');
@ -69,10 +57,6 @@ const pluginInitializerContextParams = {
},
custom: {},
},
ephemeral_tasks: {
enabled: false,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
authenticate_background_task_utilization: true,
@ -94,8 +78,6 @@ describe('TaskManagerPlugin', () => {
beforeEach(() => {
mockTaskPollingLifecycle = taskPollingLifecycleMock.create({});
(TaskPollingLifecycle as jest.Mock<TaskPollingLifecycleClass>).mockClear();
mockEphemeralTaskLifecycle = ephemeralTaskLifecycleMock.create({});
(EphemeralTaskLifecycle as jest.Mock<EphemeralTaskLifecycleClass>).mockClear();
});
describe('setup', () => {
@ -164,9 +146,6 @@ describe('TaskManagerPlugin', () => {
});
expect(TaskPollingLifecycle as jest.Mock<TaskPollingLifecycleClass>).toHaveBeenCalledTimes(1);
expect(
EphemeralTaskLifecycle as jest.Mock<EphemeralTaskLifecycleClass>
).toHaveBeenCalledTimes(1);
});
test('should not initialize task polling lifecycle if node.roles.backgroundTasks is false', async () => {
@ -181,9 +160,6 @@ describe('TaskManagerPlugin', () => {
});
expect(TaskPollingLifecycle as jest.Mock<TaskPollingLifecycleClass>).not.toHaveBeenCalled();
expect(
EphemeralTaskLifecycle as jest.Mock<EphemeralTaskLifecycleClass>
).not.toHaveBeenCalled();
});
});

View file

@ -35,8 +35,7 @@ import { createManagedConfiguration } from './lib/create_managed_configuration';
import { TaskScheduling } from './task_scheduling';
import { backgroundTaskUtilizationRoute, healthRoute, metricsRoute } from './routes';
import { createMonitoringStats, MonitoringStats } from './monitoring';
import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle';
import { EphemeralTask, ConcreteTaskInstance } from './task';
import { ConcreteTaskInstance } from './task';
import { registerTaskManagerUsageCollector } from './usage';
import { TASK_MANAGER_INDEX } from './constants';
import { AdHocTaskCounter } from './lib/adhoc_task_counter';
@ -67,7 +66,6 @@ export type TaskManagerStartContract = Pick<
TaskScheduling,
| 'schedule'
| 'runSoon'
| 'ephemeralRunNow'
| 'ensureScheduled'
| 'bulkUpdateSchedules'
| 'bulkEnable'
@ -78,7 +76,6 @@ export type TaskManagerStartContract = Pick<
Pick<TaskStore, 'fetch' | 'aggregate' | 'get' | 'remove' | 'bulkRemove'> & {
removeIfExists: TaskStore['remove'];
} & {
supportsEphemeralTasks: () => boolean;
getRegisteredTypes: () => string[];
};
@ -92,7 +89,6 @@ export class TaskManagerPlugin
implements Plugin<TaskManagerSetupContract, TaskManagerStartContract>
{
private taskPollingLifecycle?: TaskPollingLifecycle;
private ephemeralTaskLifecycle?: EphemeralTaskLifecycle;
private taskManagerId?: string;
private usageCounter?: UsageCounter;
private config: TaskManagerConfig;
@ -218,8 +214,6 @@ export class TaskManagerPlugin
usageCollection,
monitoredHealth$,
monitoredUtilization$,
this.config.ephemeral_tasks.enabled,
this.config.ephemeral_tasks.request_capacity,
this.config.unsafe.exclude_task_types
);
}
@ -350,17 +344,6 @@ export class TaskManagerPlugin
...managedConfiguration,
taskPartitioner,
});
this.ephemeralTaskLifecycle = new EphemeralTaskLifecycle({
config: this.config!,
definitions: this.definitions,
logger: this.logger,
executionContext,
middleware: this.middleware,
elasticsearchAndSOAvailability$: this.elasticsearchAndSOAvailability$!,
pool: this.taskPollingLifecycle.pool,
lifecycleEvent: this.taskPollingLifecycle.events,
});
}
createMonitoringStats({
@ -372,7 +355,6 @@ export class TaskManagerPlugin
adHocTaskCounter: this.adHocTaskCounter,
taskDefinitions: this.definitions,
taskPollingLifecycle: this.taskPollingLifecycle,
ephemeralTaskLifecycle: this.ephemeralTaskLifecycle,
}).subscribe((stat) => this.monitoringStats$.next(stat));
metricsStream({
@ -387,7 +369,6 @@ export class TaskManagerPlugin
logger: this.logger,
taskStore,
middleware: this.middleware,
ephemeralTaskLifecycle: this.ephemeralTaskLifecycle,
taskManagerId: taskStore.taskManagerId,
});
@ -409,9 +390,6 @@ export class TaskManagerPlugin
bulkEnable: (...args) => taskScheduling.bulkEnable(...args),
bulkDisable: (...args) => taskScheduling.bulkDisable(...args),
bulkUpdateSchedules: (...args) => taskScheduling.bulkUpdateSchedules(...args),
ephemeralRunNow: (task: EphemeralTask) => taskScheduling.ephemeralRunNow(task),
supportsEphemeralTasks: () =>
this.config.ephemeral_tasks.enabled && this.shouldRunBackgroundTasks,
getRegisteredTypes: () => this.definitions.getAllTypes(),
bulkUpdateState: (...args) => taskScheduling.bulkUpdateState(...args),
};

View file

@ -84,10 +84,6 @@ describe('TaskPollingLifecycle', () => {
},
custom: {},
},
ephemeral_tasks: {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
authenticate_background_task_utilization: true,

View file

@ -26,7 +26,6 @@ import {
asTaskPollingCycleEvent,
TaskManagerStat,
asTaskManagerStatEvent,
EphemeralTaskRejectedDueToCapacity,
TaskManagerMetric,
} from './task_events';
import { fillPool, FillPoolResult, TimedFillPoolResult } from './lib/fill_pool';
@ -71,8 +70,7 @@ export type TaskLifecycleEvent =
| TaskRunRequest
| TaskPollingCycle
| TaskManagerStat
| TaskManagerMetric
| EphemeralTaskRejectedDueToCapacity;
| TaskManagerMetric;
/**
* The public interface into the task manager system.

View file

@ -299,7 +299,6 @@ describe('healthRoute', () => {
const warnRuntimeStat = mockHealthStats();
const warnConfigurationStat = mockHealthStats();
const warnWorkloadStat = mockHealthStats();
const warnEphemeralStat = mockHealthStats();
const stats$ = new Subject<MonitoringStats>();
@ -334,15 +333,13 @@ describe('healthRoute', () => {
stats$.next(warnConfigurationStat);
await sleep(1001);
stats$.next(warnWorkloadStat);
await sleep(1001);
stats$.next(warnEphemeralStat);
expect(await serviceStatus).toMatchObject({
level: ServiceStatusLevels.degraded,
summary: `Task Manager is unhealthy - Reason: ${reason}`,
});
expect(logHealthMetrics).toBeCalledTimes(4);
expect(logHealthMetrics).toBeCalledTimes(3);
expect(logHealthMetrics.mock.calls[0][0]).toMatchObject({
id,
timestamp: expect.any(String),
@ -367,14 +364,6 @@ describe('healthRoute', () => {
summarizeMonitoringStats(logger, warnWorkloadStat, getTaskManagerConfig({}))
),
});
expect(logHealthMetrics.mock.calls[3][0]).toMatchObject({
id,
timestamp: expect.any(String),
status: expect.any(String),
...ignoreCapacityEstimation(
summarizeMonitoringStats(logger, warnEphemeralStat, getTaskManagerConfig({}))
),
});
});
it(`logs at an error level if the status is error`, async () => {
@ -402,7 +391,6 @@ describe('healthRoute', () => {
const errorRuntimeStat = mockHealthStats();
const errorConfigurationStat = mockHealthStats();
const errorWorkloadStat = mockHealthStats();
const errorEphemeralStat = mockHealthStats();
const stats$ = new Subject<MonitoringStats>();
@ -437,15 +425,13 @@ describe('healthRoute', () => {
stats$.next(errorConfigurationStat);
await sleep(1001);
stats$.next(errorWorkloadStat);
await sleep(1001);
stats$.next(errorEphemeralStat);
expect(await serviceStatus).toMatchObject({
level: ServiceStatusLevels.degraded,
summary: `Task Manager is unhealthy - Reason: ${reason}`,
});
expect(logHealthMetrics).toBeCalledTimes(4);
expect(logHealthMetrics).toBeCalledTimes(3);
expect(logHealthMetrics.mock.calls[0][0]).toMatchObject({
id,
timestamp: expect.any(String),
@ -470,14 +456,6 @@ describe('healthRoute', () => {
summarizeMonitoringStats(logger, errorWorkloadStat, getTaskManagerConfig({}))
),
});
expect(logHealthMetrics.mock.calls[3][0]).toMatchObject({
id,
timestamp: expect.any(String),
status: expect.any(String),
...ignoreCapacityEstimation(
summarizeMonitoringStats(logger, errorEphemeralStat, getTaskManagerConfig({}))
),
});
});
it('returns a error status if the overall stats have not been updated within the required hot freshness', async () => {
@ -548,9 +526,6 @@ describe('healthRoute', () => {
workload: {
timestamp: expect.any(String),
},
ephemeral: {
timestamp: expect.any(String),
},
runtime: {
timestamp: expect.any(String),
value: {
@ -653,9 +628,6 @@ describe('healthRoute', () => {
workload: {
timestamp: expect.any(String),
},
ephemeral: {
timestamp: expect.any(String),
},
runtime: {
timestamp: expect.any(String),
value: {
@ -737,9 +709,6 @@ describe('healthRoute', () => {
workload: {
timestamp: expect.any(String),
},
ephemeral: {
timestamp: expect.any(String),
},
runtime: {
timestamp: expect.any(String),
value: {
@ -952,15 +921,6 @@ function mockHealthStats(overrides = {}) {
},
},
},
ephemeral: {
timestamp: new Date().toISOString(),
value: {
load: [],
executionsPerCycle: [],
queuedTasks: [],
delay: [],
},
},
},
};
return merge(stub, overrides) as unknown as MonitoringStats;

View file

@ -471,16 +471,6 @@ export interface ConcreteTaskInstanceVersion {
error?: string;
}
/**
* A task instance that has an id and is ready for storage.
*/
export type EphemeralTask = Pick<
ConcreteTaskInstance,
'taskType' | 'params' | 'state' | 'scope' | 'enabled'
>;
export type EphemeralTaskInstance = EphemeralTask &
Pick<ConcreteTaskInstance, 'id' | 'scheduledAt' | 'startedAt' | 'runAt' | 'status' | 'ownerId'>;
export type SerializedConcreteTaskInstance = Omit<
ConcreteTaskInstance,
'state' | 'params' | 'scheduledAt' | 'startedAt' | 'retryAt' | 'runAt'

View file

@ -13,14 +13,12 @@ import { Result, Err } from './lib/result_type';
import { ClaimAndFillPoolResult } from './lib/fill_pool';
import { PollingError } from './polling';
import { DecoratedError, TaskRunResult } from './task_running';
import { EphemeralTaskInstanceRequest } from './ephemeral_task_lifecycle';
import type { EventLoopDelayConfig } from './config';
import { TaskManagerMetrics } from './metrics/task_metrics_collector';
export enum TaskPersistence {
Recurring = 'recurring',
NonRecurring = 'non_recurring',
Ephemeral = 'ephemeral',
}
export enum TaskEventType {
@ -31,7 +29,6 @@ export enum TaskEventType {
TASK_POLLING_CYCLE = 'TASK_POLLING_CYCLE',
TASK_MANAGER_METRIC = 'TASK_MANAGER_METRIC',
TASK_MANAGER_STAT = 'TASK_MANAGER_STAT',
EPHEMERAL_TASK_DELAYED_DUE_TO_CAPACITY = 'EPHEMERAL_TASK_DELAYED_DUE_TO_CAPACITY',
}
export interface TaskTiming {
@ -82,7 +79,6 @@ export type TaskMarkRunning = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskRun = TaskEvent<RanTask, ErroredTask>;
export type TaskClaim = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskRunRequest = TaskEvent<ConcreteTaskInstance, Error>;
export type EphemeralTaskRejectedDueToCapacity = TaskEvent<EphemeralTaskInstanceRequest, Error>;
export type TaskPollingCycle<T = string> = TaskEvent<ClaimAndFillPoolResult, PollingError<T>>;
export type TaskManagerMetric = TaskEvent<TaskManagerMetrics, Error>;
@ -90,8 +86,6 @@ export type TaskManagerStats =
| 'load'
| 'pollingDelay'
| 'claimDuration'
| 'queuedEphemeralTasks'
| 'ephemeralTaskDelay'
| 'workerUtilization'
| 'runDelay';
export type TaskManagerStat = TaskEvent<number, never, TaskManagerStats>;
@ -187,19 +181,6 @@ export function asTaskManagerMetricEvent(
};
}
export function asEphemeralTaskRejectedDueToCapacityEvent(
id: string,
event: Result<EphemeralTaskInstanceRequest, Error>,
timing?: TaskTiming
): EphemeralTaskRejectedDueToCapacity {
return {
id,
type: TaskEventType.EPHEMERAL_TASK_DELAYED_DUE_TO_CAPACITY,
event,
timing,
};
}
export function isTaskMarkRunningEvent(
taskEvent: TaskEvent<unknown, unknown>
): taskEvent is TaskMarkRunning {
@ -236,8 +217,3 @@ export function isTaskManagerMetricEvent(
): taskEvent is TaskManagerStat {
return taskEvent.type === TaskEventType.TASK_MANAGER_METRIC;
}
export function isEphemeralTaskRejectedDueToCapacityEvent(
taskEvent: TaskEvent<unknown, unknown>
): taskEvent is EphemeralTaskRejectedDueToCapacity {
return taskEvent.type === TaskEventType.EPHEMERAL_TASK_DELAYED_DUE_TO_CAPACITY;
}

View file

@ -1,396 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
/*
* This module contains the core logic for running an individual task.
* It handles the full lifecycle of a task run, including error handling,
* rescheduling, middleware application, etc.
*/
import apm from 'elastic-apm-node';
import { v4 as uuidv4 } from 'uuid';
import { withSpan } from '@kbn/apm-utils';
import { identity } from 'lodash';
import { Logger, ExecutionContextStart } from '@kbn/core/server';
import { Middleware } from '../lib/middleware';
import { asOk, asErr, eitherAsync, Result } from '../lib/result_type';
import {
TaskRun,
TaskMarkRunning,
asTaskRunEvent,
asTaskMarkRunningEvent,
startTaskTimer,
TaskTiming,
TaskPersistence,
} from '../task_events';
import { intervalFromDate } from '../lib/intervals';
import {
CancellableTask,
ConcreteTaskInstance,
isFailedRunResult,
SuccessfulRunResult,
FailedRunResult,
TaskStatus,
EphemeralTaskInstance,
} from '../task';
import { TaskTypeDictionary } from '../task_type_dictionary';
import {
asPending,
asReadyToRun,
EMPTY_RUN_RESULT,
isPending,
isReadyToRun,
TaskRunner,
TaskRunningInstance,
TaskRunResult,
TASK_MANAGER_RUN_TRANSACTION_TYPE,
TASK_MANAGER_TRANSACTION_TYPE,
TASK_MANAGER_TRANSACTION_TYPE_MARK_AS_RUNNING,
} from './task_runner';
type Opts = {
logger: Logger;
definitions: TaskTypeDictionary;
instance: EphemeralTaskInstance;
onTaskEvent?: (event: TaskRun | TaskMarkRunning) => void;
executionContext: ExecutionContextStart;
} & Pick<Middleware, 'beforeRun' | 'beforeMarkRunning'>;
// ephemeral tasks cannot be rescheduled or scheduled to run again in the future
type EphemeralSuccessfulRunResult = Omit<SuccessfulRunResult, 'runAt' | 'schedule'>;
type EphemeralFailedRunResult = Omit<FailedRunResult, 'runAt' | 'schedule'>;
/**
*
* @export
* @class EphemeralTaskManagerRunner
* @implements {TaskRunner}
*/
export class EphemeralTaskManagerRunner implements TaskRunner {
private task?: CancellableTask;
private instance: TaskRunningInstance;
private definitions: TaskTypeDictionary;
private logger: Logger;
private beforeRun: Middleware['beforeRun'];
private beforeMarkRunning: Middleware['beforeMarkRunning'];
private onTaskEvent: (event: TaskRun | TaskMarkRunning) => void;
private uuid: string;
private readonly executionContext: ExecutionContextStart;
/**
* Creates an instance of EphemeralTaskManagerRunner.
* @param {Opts} opts
* @prop {Logger} logger - The task manager logger
* @prop {TaskDefinition} definition - The definition of the task being run
* @prop {EphemeralTaskInstance} instance - The record describing this particular task instance
* @prop {BeforeRunFunction} beforeRun - A function that adjusts the run context prior to running the task
* @memberof TaskManagerRunner
*/
constructor({
instance,
definitions,
logger,
beforeRun,
beforeMarkRunning,
onTaskEvent = identity,
executionContext,
}: Opts) {
this.instance = asPending(asConcreteInstance(sanitizeInstance(instance)));
this.definitions = definitions;
this.logger = logger;
this.beforeRun = beforeRun;
this.beforeMarkRunning = beforeMarkRunning;
this.onTaskEvent = onTaskEvent;
this.executionContext = executionContext;
this.uuid = uuidv4();
}
/**
* Gets the id of this task instance.
*/
public get id() {
return this.instance.task.id;
}
/**
* Gets the exeuction id of this task instance.
*/
public get taskExecutionId() {
return `${this.id}::${this.uuid}`;
}
/**
* Test whether given execution ID identifies a different execution of this same task
* @param id
*/
public isSameTask(executionId: string) {
return executionId.startsWith(this.id);
}
/**
* Gets the task type of this task instance.
*/
public get taskType() {
return this.instance.task.taskType;
}
/**
* Get the stage this TaskRunner is at
*/
public get stage() {
return this.instance.stage;
}
/**
* Gets the task defintion from the dictionary.
*/
public get definition() {
return this.definitions.get(this.taskType);
}
/**
* Gets the time at which this task will expire.
*/
public get expiration() {
return intervalFromDate(
// if the task is running, use it's started at, otherwise use the timestamp at
// which it was last updated
// this allows us to catch tasks that remain in Pending/Finalizing without being
// cleaned up
isReadyToRun(this.instance) ? this.instance.task.startedAt : this.instance.timestamp,
this.definition?.timeout
)!;
}
/**
* Gets the duration of the current task run
*/
public get startedAt() {
return this.instance.task.startedAt;
}
/**
* Gets whether or not this task has run longer than its expiration setting allows.
*/
public get isExpired() {
return this.expiration < new Date();
}
/**
* Returns true whenever the task is ad hoc and has ran out of attempts. When true before
* running a task, the task should be deleted instead of ran.
*/
public get isAdHocTaskAndOutOfAttempts() {
return false;
}
public get isEphemeral() {
return true;
}
/**
* Returns a log-friendly representation of this task.
*/
public toString() {
return `${this.taskType} "${this.id}" (Ephemeral)`;
}
/**
* Runs the task, handling the task result, errors, etc, rescheduling if need
* be. NOTE: the time of applying the middleware's beforeRun is incorporated
* into the total timeout time the task in configured with. We may decide to
* start the timer after beforeRun resolves
*
* @returns {Promise<Result<SuccessfulRunResult, FailedRunResult>>}
*/
public async run(): Promise<Result<SuccessfulRunResult, FailedRunResult>> {
const definition = this.definition;
if (!definition) {
throw new Error(`Running ephemeral task ${this} failed because it has no definition`);
}
if (!isReadyToRun(this.instance)) {
throw new Error(
`Running ephemeral task ${this} failed as it ${
isPending(this.instance) ? `isn't ready to be ran` : `has already been ran`
}`
);
}
this.logger.debug(`Running ephemeral task ${this}`);
const apmTrans = apm.startTransaction(this.taskType, TASK_MANAGER_RUN_TRANSACTION_TYPE, {
childOf: this.instance.task.traceparent,
});
apmTrans?.addLabels({ ephemeral: true });
const modifiedContext = await this.beforeRun({
taskInstance: asConcreteInstance(this.instance.task),
});
const stopTaskTimer = startTaskTimer();
try {
this.task = definition.createTaskRunner(modifiedContext);
const ctx = {
type: 'task manager',
name: `run ephemeral ${this.instance.task.taskType}`,
id: this.instance.task.id,
description: 'run ephemeral task',
};
const result = await this.executionContext.withContext(ctx, () =>
withSpan({ name: 'ephemeral run', type: 'task manager' }, () => this.task!.run())
);
const validatedResult = this.validateResult(result);
const processedResult = await withSpan(
{ name: 'process ephemeral result', type: 'task manager' },
() => this.processResult(validatedResult, stopTaskTimer())
);
if (apmTrans) apmTrans.end('success');
return processedResult;
} catch (err) {
this.logger.error(`Task ${this} failed: ${err}`);
// in error scenario, we can not get the RunResult
const processedResult = await withSpan(
{ name: 'process ephemeral result', type: 'task manager' },
() =>
this.processResult(
asErr({ error: err, state: modifiedContext.taskInstance.state }),
stopTaskTimer()
)
);
if (apmTrans) apmTrans.end('failure');
return processedResult;
}
}
/**
* Used by the non-ephemeral task runner
*/
public async removeTask(): Promise<void> {}
/**
* Noop for Ephemeral tasks
*
* @returns {Promise<boolean>}
*/
public async markTaskAsRunning(): Promise<boolean> {
if (!isPending(this.instance)) {
throw new Error(
`Marking ephemeral task ${this} as running has failed as it ${
isReadyToRun(this.instance) ? `is already running` : `has already been ran`
}`
);
}
const apmTrans = apm.startTransaction(
TASK_MANAGER_TRANSACTION_TYPE_MARK_AS_RUNNING,
TASK_MANAGER_TRANSACTION_TYPE
);
apmTrans?.addLabels({ entityId: this.taskType });
const now = new Date();
try {
const { taskInstance } = await this.beforeMarkRunning({
taskInstance: asConcreteInstance(this.instance.task),
});
this.instance = asReadyToRun({
...taskInstance,
status: TaskStatus.Running,
startedAt: now,
attempts: taskInstance.attempts + 1,
retryAt: null,
});
if (apmTrans) apmTrans.end('success');
this.onTaskEvent(asTaskMarkRunningEvent(this.id, asOk(this.instance.task)));
return true;
} catch (error) {
if (apmTrans) apmTrans.end('failure');
this.onTaskEvent(asTaskMarkRunningEvent(this.id, asErr(error)));
}
return false;
}
/**
* Attempts to cancel the task.
*
* @returns {Promise<void>}
*/
public async cancel() {
const { task } = this;
if (task?.cancel) {
// it will cause the task state of "running" to be cleared
this.task = undefined;
return task.cancel();
}
this.logger.debug(`The ephemral task ${this} is not cancellable.`);
}
private validateResult(
result?: SuccessfulRunResult | FailedRunResult | void
): Result<EphemeralSuccessfulRunResult, EphemeralFailedRunResult> {
return isFailedRunResult(result)
? asErr({ ...result, error: result.error })
: asOk(result || EMPTY_RUN_RESULT);
}
private async processResult(
result: Result<EphemeralSuccessfulRunResult, EphemeralFailedRunResult>,
taskTiming: TaskTiming
): Promise<Result<SuccessfulRunResult, FailedRunResult>> {
await eitherAsync(
result,
async ({ state }: EphemeralSuccessfulRunResult) => {
this.onTaskEvent(
asTaskRunEvent(
this.id,
asOk({
task: { ...this.instance.task, state },
persistence: TaskPersistence.Ephemeral,
result: TaskRunResult.Success,
isExpired: false,
}),
taskTiming
)
);
},
async ({ error, state }: EphemeralFailedRunResult) => {
this.onTaskEvent(
asTaskRunEvent(
this.id,
asErr({
task: { ...this.instance.task, state },
persistence: TaskPersistence.Ephemeral,
result: TaskRunResult.Failed,
isExpired: false,
error,
}),
taskTiming
)
);
}
);
return result;
}
}
function sanitizeInstance(instance: EphemeralTaskInstance): EphemeralTaskInstance {
return {
...instance,
params: instance.params || {},
state: instance.state || {},
};
}
function asConcreteInstance(instance: EphemeralTaskInstance): ConcreteTaskInstance {
return {
...instance,
attempts: 0,
retryAt: null,
};
}

View file

@ -5,7 +5,6 @@
* 2.0.
*/
import { TaskErrorSource } from '../../common';
import { EphemeralTask } from '../task';
export { TaskErrorSource };
@ -23,19 +22,6 @@ export interface DecoratedError extends Error {
[source]?: TaskErrorSource;
}
export class EphemeralTaskRejectedDueToCapacityError extends Error {
private _task: EphemeralTask;
constructor(message: string, task: EphemeralTask) {
super(message);
this._task = task;
}
public get task() {
return this._task;
}
}
function isTaskManagerError(error: unknown): error is DecoratedError {
return Boolean(error && (error as DecoratedError)[code]);
}
@ -87,9 +73,3 @@ export function getErrorSource(error: Error | DecoratedError): TaskErrorSource |
export function isUserError(error: Error | DecoratedError) {
return getErrorSource(error) === TaskErrorSource.USER;
}
export function isEphemeralTaskRejectedDueToCapacityError(
error: Error | EphemeralTaskRejectedDueToCapacityError
) {
return Boolean(error && error instanceof EphemeralTaskRejectedDueToCapacityError);
}

View file

@ -79,7 +79,6 @@ export interface TaskRunner {
id: string;
taskExecutionId: string;
stage: string;
isEphemeral?: boolean;
toString: () => string;
isSameTask: (executionId: string) => boolean;
isAdHocTaskAndOutOfAttempts: boolean;

View file

@ -14,7 +14,6 @@ const createTaskSchedulingMock = () => {
ensureScheduled: jest.fn(),
schedule: jest.fn(),
runSoon: jest.fn(),
ephemeralRunNow: jest.fn(),
} as unknown as jest.Mocked<TaskScheduling>;
};

View file

@ -6,20 +6,15 @@
*/
import sinon from 'sinon';
import { BehaviorSubject, Observable, Subject } from 'rxjs';
import moment from 'moment';
import { asTaskRunEvent, TaskPersistence } from './task_events';
import { TaskLifecycleEvent } from './polling_lifecycle';
import { TaskScheduling } from './task_scheduling';
import { asErr, asOk } from './lib/result_type';
import { asOk } from './lib/result_type';
import { TaskStatus } from './task';
import { createInitialMiddleware } from './lib/middleware';
import { taskStoreMock } from './task_store.mock';
import { TaskRunResult } from './task_running';
import { mockLogger } from './test_utils';
import { TaskTypeDictionary } from './task_type_dictionary';
import { ephemeralTaskLifecycleMock } from './ephemeral_task_lifecycle.mock';
import { taskManagerMock } from './mocks';
import { omit } from 'lodash';
@ -52,7 +47,6 @@ describe('TaskScheduling', () => {
logger: mockLogger(),
middleware: createInitialMiddleware(),
definitions,
ephemeralTaskLifecycle: ephemeralTaskLifecycleMock.create({}),
taskManagerId: '123',
};
@ -835,122 +829,6 @@ describe('TaskScheduling', () => {
});
});
describe('ephemeralRunNow', () => {
test('runs a task ephemerally', async () => {
const ephemeralEvents$ = new BehaviorSubject<Partial<TaskLifecycleEvent>>({});
const ephemeralTask = taskManagerMock.createTask({
state: {
foo: 'bar',
},
});
const customEphemeralTaskLifecycleMock = ephemeralTaskLifecycleMock.create({
events$: ephemeralEvents$ as Observable<TaskLifecycleEvent>,
});
customEphemeralTaskLifecycleMock.attemptToRun.mockImplementation((value) => {
return {
tag: 'ok',
value,
};
});
const middleware = createInitialMiddleware();
middleware.beforeSave = jest.fn().mockImplementation(async () => {
return { taskInstance: ephemeralTask };
});
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
middleware,
ephemeralTaskLifecycle: customEphemeralTaskLifecycleMock,
});
const result = taskScheduling.ephemeralRunNow(ephemeralTask);
ephemeralEvents$.next(
asTaskRunEvent(
'v4uuid',
asOk({
task: {
...ephemeralTask,
id: 'v4uuid',
},
result: TaskRunResult.Success,
persistence: TaskPersistence.Ephemeral,
isExpired: false,
})
)
);
await expect(result).resolves.toEqual({ id: 'v4uuid', state: { foo: 'bar' } });
});
test('rejects ephemeral task if lifecycle returns an error', async () => {
const ephemeralEvents$ = new Subject<TaskLifecycleEvent>();
const ephemeralTask = taskManagerMock.createTask({
state: {
foo: 'bar',
},
});
const customEphemeralTaskLifecycleMock = ephemeralTaskLifecycleMock.create({
events$: ephemeralEvents$,
});
customEphemeralTaskLifecycleMock.attemptToRun.mockImplementation((value) => {
return asErr(value);
});
const middleware = createInitialMiddleware();
middleware.beforeSave = jest.fn().mockImplementation(async () => {
return { taskInstance: ephemeralTask };
});
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
middleware,
ephemeralTaskLifecycle: customEphemeralTaskLifecycleMock,
});
const result = taskScheduling.ephemeralRunNow(ephemeralTask);
ephemeralEvents$.next(
asTaskRunEvent(
'v4uuid',
asOk({
task: {
...ephemeralTask,
id: 'v4uuid',
},
result: TaskRunResult.Failed,
persistence: TaskPersistence.Ephemeral,
isExpired: false,
})
)
);
await expect(result).rejects.toMatchInlineSnapshot(
`[Error: Ephemeral Task of type foo was rejected]`
);
});
test('rejects ephemeral task if ephemeralTaskLifecycle is not defined', async () => {
const ephemeralTask = taskManagerMock.createTask({
state: {
foo: 'bar',
},
});
const middleware = createInitialMiddleware();
middleware.beforeSave = jest.fn().mockImplementation(async () => {
return { taskInstance: ephemeralTask };
});
const taskScheduling = new TaskScheduling({
...taskSchedulingOpts,
middleware,
ephemeralTaskLifecycle: undefined,
});
const result = taskScheduling.ephemeralRunNow(ephemeralTask);
await expect(result).rejects.toMatchInlineSnapshot(
`[Error: Ephemeral Task of type foo was rejected because ephemeral tasks are not supported]`
);
});
});
describe('bulkSchedule', () => {
test('allows scheduling tasks', async () => {
const taskScheduling = new TaskScheduling(taskSchedulingOpts);

View file

@ -5,29 +5,14 @@
* 2.0.
*/
import { filter, take } from 'rxjs';
import pMap from 'p-map';
import { v4 as uuidv4 } from 'uuid';
import { chunk, flatten, pick } from 'lodash';
import { Subject } from 'rxjs';
import { chunk, flatten } from 'lodash';
import agent from 'elastic-apm-node';
import { Logger } from '@kbn/core/server';
import { either, isErr, mapErr } from './lib/result_type';
import {
ErroredTask,
ErrResultOf,
isTaskClaimEvent,
isTaskRunEvent,
isTaskRunRequestEvent,
OkResultOf,
RanTask,
} from './task_events';
import { Middleware } from './lib/middleware';
import { parseIntervalAsMillisecond } from './lib/intervals';
import {
ConcreteTaskInstance,
EphemeralTask,
IntervalSchedule,
TaskInstanceWithDeprecatedFields,
TaskInstanceWithId,
@ -35,9 +20,6 @@ import {
} from './task';
import { TaskStore } from './task_store';
import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields';
import { TaskLifecycleEvent } from './polling_lifecycle';
import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle';
import { EphemeralTaskRejectedDueToCapacityError } from './task_running';
import { retryableBulkUpdate } from './lib/retryable_bulk_update';
import { ErrorOutput } from './lib/bulk_operation_buffer';
@ -46,7 +28,6 @@ const BULK_ACTION_SIZE = 100;
export interface TaskSchedulingOpts {
logger: Logger;
taskStore: TaskStore;
ephemeralTaskLifecycle?: EphemeralTaskLifecycle;
middleware: Middleware;
taskManagerId: string;
}
@ -76,10 +57,8 @@ export interface RunNowResult {
export class TaskScheduling {
private store: TaskStore;
private ephemeralTaskLifecycle?: EphemeralTaskLifecycle;
private logger: Logger;
private middleware: Middleware;
private taskManagerId: string;
/**
* Initializes the task manager, preventing any further addition of middleware,
@ -89,9 +68,7 @@ export class TaskScheduling {
constructor(opts: TaskSchedulingOpts) {
this.logger = opts.logger;
this.middleware = opts.middleware;
this.ephemeralTaskLifecycle = opts.ephemeralTaskLifecycle;
this.store = opts.taskStore;
this.taskManagerId = opts.taskManagerId;
}
/**
@ -284,68 +261,6 @@ export class TaskScheduling {
return { id: task.id };
}
/**
* Run an ad-hoc task in memory without persisting it into ES or distributing the load across the cluster.
*
* @param task - The ephemeral task being queued.
* @returns {Promise<ConcreteTaskInstance>}
*/
public async ephemeralRunNow(
task: EphemeralTask,
options?: Record<string, unknown>
): Promise<RunNowResult> {
if (!this.ephemeralTaskLifecycle) {
throw new EphemeralTaskRejectedDueToCapacityError(
`Ephemeral Task of type ${task.taskType} was rejected because ephemeral tasks are not supported`,
task
);
}
const id = uuidv4();
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
...options,
taskInstance: task,
});
return new Promise(async (resolve, reject) => {
try {
// The actual promise returned from this function is resolved after the awaitTaskRunResult promise resolves.
// However, we do not wait to await this promise, as we want later execution to happen in parallel.
// The awaitTaskRunResult promise is resolved once the ephemeral task is successfully executed (technically, when a TaskEventType.TASK_RUN is emitted with the same id).
// However, the ephemeral task won't even get into the queue until the subsequent this.ephemeralTaskLifecycle.attemptToRun is called (which puts it in the queue).
// The reason for all this confusion? Timing.
// In the this.ephemeralTaskLifecycle.attemptToRun, it's possible that the ephemeral task is put into the queue and processed before this function call returns anything.
// If that happens, putting the awaitTaskRunResult after would just hang because the task already completed. We need to listen for the completion before we add it to the queue to avoid this possibility.
const { cancel, resolveOnCancel } = cancellablePromise();
this.awaitTaskRunResult(id, resolveOnCancel)
.then((arg: RunNowResult) => {
resolve(arg);
})
.catch((err: Error) => {
reject(err);
});
const attemptToRunResult = this.ephemeralTaskLifecycle!.attemptToRun({
id,
scheduledAt: new Date(),
runAt: new Date(),
status: TaskStatus.Idle,
ownerId: this.taskManagerId,
...modifiedTask,
});
if (isErr(attemptToRunResult)) {
cancel();
reject(
new EphemeralTaskRejectedDueToCapacityError(
`Ephemeral Task of type ${task.taskType} was rejected`,
task
)
);
}
} catch (error) {
reject(error);
}
});
}
/**
* Schedules a task with an Id
*
@ -366,63 +281,6 @@ export class TaskScheduling {
}
}
private awaitTaskRunResult(taskId: string, cancel?: Promise<void>): Promise<RunNowResult> {
return new Promise((resolve, reject) => {
if (!this.ephemeralTaskLifecycle) {
reject(
new Error(
`Failed to run task "${taskId}" because ephemeral tasks are not supported. Rescheduled the task to ensure it is picked up as soon as possible.`
)
);
}
// listen for all events related to the current task
const subscription = this.ephemeralTaskLifecycle!.events.pipe(
filter(({ id }: TaskLifecycleEvent) => id === taskId)
).subscribe((taskEvent: TaskLifecycleEvent) => {
if (isTaskClaimEvent(taskEvent)) {
mapErr(async (error: Error) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
}, taskEvent.event);
} else {
either<OkResultOf<TaskLifecycleEvent>, ErrResultOf<TaskLifecycleEvent>>(
taskEvent.event,
(taskInstance: OkResultOf<TaskLifecycleEvent>) => {
// resolve if the task has run sucessfully
if (isTaskRunEvent(taskEvent)) {
resolve(pick((taskInstance as RanTask).task, ['id', 'state']));
subscription.unsubscribe();
}
},
async (errorResult: ErrResultOf<TaskLifecycleEvent>) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
return reject(
new Error(
`Failed to run task "${taskId}": ${
isTaskRunRequestEvent(taskEvent)
? `Task Manager is at capacity, please try again later`
: isTaskRunEvent(taskEvent)
? `${(errorResult as ErroredTask).error}`
: `${errorResult}`
}`
)
);
}
);
}
});
if (cancel) {
cancel
.then(() => {
subscription.unsubscribe();
})
.catch(() => {});
}
});
}
private async getNonRunningTask(taskId: string) {
const task = await this.store.get(taskId);
switch (task.status) {
@ -438,17 +296,6 @@ export class TaskScheduling {
}
}
const cancellablePromise = () => {
const boolStream = new Subject<boolean>();
return {
cancel: () => boolStream.next(true),
resolveOnCancel: boolStream
.pipe(take(1))
.toPromise()
.then(() => {}),
};
};
const randomlyOffsetRunTimestamp: (task: ConcreteTaskInstance) => ConcreteTaskInstance = (task) => {
const now = Date.now();
const maximumOffsetTimestamp = now + 1000 * 60 * 5; // now + 5 minutes

View file

@ -26,43 +26,6 @@ describe('registerTaskManagerUsageCollector', () => {
let collector: Collector<unknown>;
const logger = loggingSystemMock.createLogger();
it('should report telemetry on the ephemeral queue', async () => {
const monitoringStats$ = new Subject<MonitoredHealth>();
const monitoringUtilization$ = new Subject<MonitoredUtilization>();
const usageCollectionMock = createUsageCollectionSetupMock();
const fetchContext = createCollectorFetchContextMock();
usageCollectionMock.makeUsageCollector.mockImplementation((config) => {
collector = new Collector(logger, config);
return createUsageCollectionSetupMock().makeUsageCollector(config);
});
registerTaskManagerUsageCollector(
usageCollectionMock,
monitoringStats$,
monitoringUtilization$,
true,
10,
[]
);
const mockHealth = getMockMonitoredHealth();
monitoringStats$.next(mockHealth);
const mockUtilization = getMockMonitoredUtilization();
monitoringUtilization$.next(mockUtilization);
await sleep(1001);
expect(usageCollectionMock.makeUsageCollector).toBeCalled();
const telemetry: TaskManagerUsage = (await collector.fetch(fetchContext)) as TaskManagerUsage;
expect(telemetry.ephemeral_tasks_enabled).toBe(true);
expect(telemetry.ephemeral_request_capacity).toBe(10);
expect(telemetry.ephemeral_stats).toMatchObject({
status: mockHealth.stats.ephemeral?.status,
load: mockHealth.stats.ephemeral?.value.load,
executions_per_cycle: mockHealth.stats.ephemeral?.value.executionsPerCycle,
queued_tasks: mockHealth.stats.ephemeral?.value.queuedTasks,
});
});
it('should report telemetry on the excluded task types', async () => {
const monitoringStats$ = new Subject<MonitoredHealth>();
const monitoringUtilization$ = new Subject<MonitoredUtilization>();
@ -77,8 +40,6 @@ describe('registerTaskManagerUsageCollector', () => {
usageCollectionMock,
monitoringStats$,
monitoringUtilization$,
true,
10,
['actions:*']
);
@ -107,8 +68,6 @@ describe('registerTaskManagerUsageCollector', () => {
usageCollectionMock,
monitoringStats$,
monitoringUtilization$,
true,
10,
['actions:*']
);
@ -146,8 +105,6 @@ describe('registerTaskManagerUsageCollector', () => {
usageCollectionMock,
monitoringStats$,
monitoringUtilization$,
true,
10,
['actions:*']
);
@ -216,30 +173,6 @@ function getMockMonitoredHealth(overrides = {}): MonitoredHealth {
},
},
},
ephemeral: {
status: HealthStatus.OK,
timestamp: new Date().toISOString(),
value: {
load: {
p50: 4,
p90: 6,
p95: 6,
p99: 6,
},
executionsPerCycle: {
p50: 4,
p90: 6,
p95: 6,
p99: 6,
},
queuedTasks: {
p50: 4,
p90: 6,
p95: 6,
p99: 6,
},
},
},
runtime: {
timestamp: new Date().toISOString(),
status: HealthStatus.OK,
@ -263,7 +196,6 @@ function getMockMonitoredHealth(overrides = {}): MonitoredHealth {
persistence: {
[TaskPersistence.Recurring]: 10,
[TaskPersistence.NonRecurring]: 10,
[TaskPersistence.Ephemeral]: 10,
},
result_frequency_percent_as_number: {},
},

View file

@ -16,8 +16,6 @@ export function createTaskManagerUsageCollector(
usageCollection: UsageCollectionSetup,
monitoringStats$: Observable<MonitoredHealth>,
monitoredUtilization$: Observable<MonitoredUtilization>,
ephemeralTasksEnabled: boolean,
ephemeralRequestCapacity: number,
excludeTaskTypes: string[]
) {
let lastMonitoredHealth: MonitoredHealth | null = null;
@ -37,29 +35,6 @@ export function createTaskManagerUsageCollector(
},
fetch: async () => {
return {
ephemeral_tasks_enabled: ephemeralTasksEnabled,
ephemeral_request_capacity: ephemeralRequestCapacity,
ephemeral_stats: {
status: lastMonitoredHealth?.stats.ephemeral?.status ?? '',
queued_tasks: {
p50: lastMonitoredHealth?.stats.ephemeral?.value.queuedTasks.p50 ?? 0,
p90: lastMonitoredHealth?.stats.ephemeral?.value.queuedTasks.p90 ?? 0,
p95: lastMonitoredHealth?.stats.ephemeral?.value.queuedTasks.p95 ?? 0,
p99: lastMonitoredHealth?.stats.ephemeral?.value.queuedTasks.p99 ?? 0,
},
load: {
p50: lastMonitoredHealth?.stats.ephemeral?.value.load.p50 ?? 0,
p90: lastMonitoredHealth?.stats.ephemeral?.value.load.p90 ?? 0,
p95: lastMonitoredHealth?.stats.ephemeral?.value.load.p95 ?? 0,
p99: lastMonitoredHealth?.stats.ephemeral?.value.load.p99 ?? 0,
},
executions_per_cycle: {
p50: lastMonitoredHealth?.stats.ephemeral?.value.executionsPerCycle.p50 ?? 0,
p90: lastMonitoredHealth?.stats.ephemeral?.value.executionsPerCycle.p90 ?? 0,
p95: lastMonitoredHealth?.stats.ephemeral?.value.executionsPerCycle.p95 ?? 0,
p99: lastMonitoredHealth?.stats.ephemeral?.value.executionsPerCycle.p99 ?? 0,
},
},
task_type_exclusion: excludeTaskTypes,
failed_tasks: Object.entries(lastMonitoredHealth?.stats.workload?.value.task_types!).reduce(
(numb, [key, val]) => {
@ -88,29 +63,6 @@ export function createTaskManagerUsageCollector(
};
},
schema: {
ephemeral_tasks_enabled: { type: 'boolean' },
ephemeral_request_capacity: { type: 'short' },
ephemeral_stats: {
status: { type: 'keyword' },
queued_tasks: {
p50: { type: 'long' },
p90: { type: 'long' },
p95: { type: 'long' },
p99: { type: 'long' },
},
load: {
p50: { type: 'long' },
p90: { type: 'long' },
p95: { type: 'long' },
p99: { type: 'long' },
},
executions_per_cycle: {
p50: { type: 'long' },
p90: { type: 'long' },
p95: { type: 'long' },
p99: { type: 'long' },
},
},
task_type_exclusion: { type: 'array', items: { type: 'keyword' } },
failed_tasks: { type: 'long' },
recurring_tasks: {
@ -130,16 +82,12 @@ export function registerTaskManagerUsageCollector(
usageCollection: UsageCollectionSetup,
monitoringStats$: Observable<MonitoredHealth>,
monitoredUtilization$: Observable<MonitoredUtilization>,
ephemeralTasksEnabled: boolean,
ephemeralRequestCapacity: number,
excludeTaskTypes: string[]
) {
const collector = createTaskManagerUsageCollector(
usageCollection,
monitoringStats$,
monitoredUtilization$,
ephemeralTasksEnabled,
ephemeralRequestCapacity,
excludeTaskTypes
);
usageCollection.registerCollector(collector);

View file

@ -7,29 +7,6 @@
export interface TaskManagerUsage {
task_type_exclusion: string[];
ephemeral_tasks_enabled: boolean;
ephemeral_request_capacity: number;
ephemeral_stats: {
status: string;
queued_tasks: {
p50: number;
p90: number;
p95: number;
p99: number;
};
load: {
p50: number;
p90: number;
p95: number;
p99: number;
};
executions_per_cycle: {
p50: number;
p90: number;
p95: number;
p99: number;
};
};
failed_tasks: number;
recurring_tasks: {
actual_service_time: number;

View file

@ -19812,67 +19812,6 @@
},
"task_manager": {
"properties": {
"ephemeral_tasks_enabled": {
"type": "boolean"
},
"ephemeral_request_capacity": {
"type": "short"
},
"ephemeral_stats": {
"properties": {
"status": {
"type": "keyword"
},
"queued_tasks": {
"properties": {
"p50": {
"type": "long"
},
"p90": {
"type": "long"
},
"p95": {
"type": "long"
},
"p99": {
"type": "long"
}
}
},
"load": {
"properties": {
"p50": {
"type": "long"
},
"p90": {
"type": "long"
},
"p95": {
"type": "long"
},
"p99": {
"type": "long"
}
}
},
"executions_per_cycle": {
"properties": {
"p50": {
"type": "long"
},
"p90": {
"type": "long"
},
"p95": {
"type": "long"
},
"p99": {
"type": "long"
}
}
}
}
},
"task_type_exclusion": {
"type": "array",
"items": {

View file

@ -34,8 +34,6 @@ export default async function ({ readConfigFile }: FtrConfigProviderContext) {
'--xpack.eventLog.logEntries=true',
'--xpack.eventLog.indexEntries=true',
'--xpack.task_manager.monitored_aggregated_stats_refresh_rate=5000',
'--xpack.task_manager.ephemeral_tasks.enabled=false',
'--xpack.task_manager.ephemeral_tasks.request_capacity=100',
`--xpack.stack_connectors.enableExperimental=${JSON.stringify([
'crowdstrikeConnectorOn',
'inferenceConnectorOn',

View file

@ -216,45 +216,6 @@ export function initRoutes(
}
);
router.post(
{
path: `/api/sample_tasks/ephemeral_run_now`,
validate: {
body: schema.object({
task: schema.object({
taskType: schema.string(),
state: schema.recordOf(schema.string(), schema.any()),
params: schema.recordOf(schema.string(), schema.any()),
}),
}),
},
},
async function (
context: RequestHandlerContext,
req: KibanaRequest<
any,
any,
{
task: {
taskType: string;
params: Record<string, any>;
state: Record<string, any>;
};
},
any
>,
res: KibanaResponseFactory
): Promise<IKibanaResponse<any>> {
const { task } = req.body;
try {
const taskManager = await taskManagerStart;
return res.ok({ body: await taskManager.ephemeralRunNow(task) });
} catch (err) {
return res.ok({ body: { task, error: `${err}` } });
}
}
);
router.post(
{
path: `/api/sample_tasks/ensure_scheduled`,

View file

@ -15,7 +15,6 @@ import {
TaskManagerSetupContract,
TaskManagerStartContract,
ConcreteTaskInstance,
EphemeralTask,
} from '@kbn/task-manager-plugin/server';
import { DEFAULT_MAX_WORKERS } from '@kbn/task-manager-plugin/server/config';
import { getDeleteTaskRunResult, TaskPriority } from '@kbn/task-manager-plugin/server/task';
@ -40,8 +39,6 @@ export class SampleTaskManagerFixturePlugin
const taskTestingEvents = new EventEmitter();
taskTestingEvents.setMaxListeners(DEFAULT_MAX_WORKERS * 2);
const tmStart = this.taskManagerStart;
const defaultSampleTaskConfig = {
timeout: '1m',
// This task allows tests to specify its behavior (whether it reschedules itself, whether it errors, etc)
@ -345,37 +342,6 @@ export class SampleTaskManagerFixturePlugin
'A task that can only have two concurrent instance and tracks its execution timing.',
...taskWithTiming,
},
taskWhichExecutesOtherTasksEphemerally: {
title: 'Task Which Executes Other Tasks Ephemerally',
description: 'A sample task used to validate how ephemeral tasks are executed.',
maxAttempts: 1,
timeout: '60s',
createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => ({
async run() {
const {
params: { tasks = [] },
} = taskInstance;
const tm = await tmStart;
const executions = await Promise.all(
(tasks as EphemeralTask[]).map(async (task) => {
return tm
.ephemeralRunNow(task)
.then((result) => ({
result,
}))
.catch((error) => ({
error,
}));
})
);
return {
state: { executions },
};
},
}),
},
});
taskManager.addMiddleware({

View file

@ -30,7 +30,6 @@ export default function ({ getService }: FtrProviderContext) {
'sampleTaskWithLimitedConcurrency',
'sampleTaskWithSingleConcurrency',
'singleAttemptSampleTask',
'taskWhichExecutesOtherTasksEphemerally',
'timedTask',
'timedTaskWithLimitedConcurrency',
'timedTaskWithSingleConcurrency',

View file

@ -302,7 +302,6 @@ export default function ({ getService }: FtrProviderContext) {
expect(typeof execution.duration.sampleTask.p95).to.eql('number');
expect(typeof execution.duration.sampleTask.p99).to.eql('number');
expect(typeof execution.persistence.ephemeral).to.eql('number');
expect(typeof execution.persistence.non_recurring).to.eql('number');
expect(typeof execution.persistence.recurring).to.eql('number');

View file

@ -197,20 +197,6 @@ export default function ({ getService }: FtrProviderContext) {
.then((response: { body: BulkUpdateTaskResult }) => response.body);
}
// TODO: Add this back in with https://github.com/elastic/kibana/issues/106139
// function runEphemeralTaskNow(task: {
// taskType: string;
// params: Record<string, any>;
// state: Record<string, any>;
// }) {
// return supertest
// .post('/api/sample_tasks/ephemeral_run_now')
// .set('kbn-xsrf', 'xxx')
// .send({ task })
// .expect(200)
// .then((response) => response.body);
// }
function scheduleTaskIfNotExists(task: Partial<ConcreteTaskInstance>) {
return supertest
.post('/api/sample_tasks/ensure_scheduled')
@ -919,196 +905,6 @@ export default function ({ getService }: FtrProviderContext) {
expect(task.runAt).to.eql(scheduledRunAt);
});
});
// TODO: Add this back in with https://github.com/elastic/kibana/issues/106139
// it('should return the resulting task state when asked to run an ephemeral task now', async () => {
// const ephemeralTask = await runEphemeralTaskNow({
// taskType: 'sampleTask',
// params: {},
// state: {},
// });
// await retry.try(async () => {
// expect(
// (await historyDocs()).filter((taskDoc) => taskDoc._source.taskId === ephemeralTask.id)
// .length
// ).to.eql(1);
// expect(ephemeralTask.state.count).to.eql(1);
// });
// const secondEphemeralTask = await runEphemeralTaskNow({
// taskType: 'sampleTask',
// params: {},
// // pass state from previous ephemeral run as input for the second run
// state: ephemeralTask.state,
// });
// // ensure state is cumulative
// expect(secondEphemeralTask.state.count).to.eql(2);
// await retry.try(async () => {
// // ensure new id is produced for second task execution
// expect(
// (await historyDocs()).filter((taskDoc) => taskDoc._source.taskId === ephemeralTask.id)
// .length
// ).to.eql(1);
// expect(
// (await historyDocs()).filter(
// (taskDoc) => taskDoc._source.taskId === secondEphemeralTask.id
// ).length
// ).to.eql(1);
// });
// });
// TODO: Add this back in with https://github.com/elastic/kibana/issues/106139
// it('Epheemral task run should only run one instance of a task if its maxConcurrency is 1', async () => {
// const ephemeralTaskWithSingleConcurrency: {
// state: {
// executions: Array<{
// result: {
// id: string;
// state: {
// timings: Array<{
// start: number;
// stop: number;
// }>;
// };
// };
// }>;
// };
// } = await runEphemeralTaskNow({
// taskType: 'taskWhichExecutesOtherTasksEphemerally',
// params: {
// tasks: [
// {
// taskType: 'timedTaskWithSingleConcurrency',
// params: { delay: 1000 },
// state: {},
// },
// {
// taskType: 'timedTaskWithSingleConcurrency',
// params: { delay: 1000 },
// state: {},
// },
// {
// taskType: 'timedTaskWithSingleConcurrency',
// params: { delay: 1000 },
// state: {},
// },
// {
// taskType: 'timedTaskWithSingleConcurrency',
// params: { delay: 1000 },
// state: {},
// },
// ],
// },
// state: {},
// });
// ensureOverlappingTasksDontExceedThreshold(
// ephemeralTaskWithSingleConcurrency.state.executions,
// // make sure each task intersects with any other task
// 0
// );
// });
// TODO: Add this back in with https://github.com/elastic/kibana/issues/106139
// it('Ephemeral task run should only run as many instances of a task as its maxConcurrency will allow', async () => {
// const ephemeralTaskWithSingleConcurrency: {
// state: {
// executions: Array<{
// result: {
// id: string;
// state: {
// timings: Array<{
// start: number;
// stop: number;
// }>;
// };
// };
// }>;
// };
// } = await runEphemeralTaskNow({
// taskType: 'taskWhichExecutesOtherTasksEphemerally',
// params: {
// tasks: [
// {
// taskType: 'timedTaskWithLimitedConcurrency',
// params: { delay: 100 },
// state: {},
// },
// {
// taskType: 'timedTaskWithLimitedConcurrency',
// params: { delay: 100 },
// state: {},
// },
// {
// taskType: 'timedTaskWithLimitedConcurrency',
// params: { delay: 100 },
// state: {},
// },
// {
// taskType: 'timedTaskWithLimitedConcurrency',
// params: { delay: 100 },
// state: {},
// },
// {
// taskType: 'timedTaskWithLimitedConcurrency',
// params: { delay: 100 },
// state: {},
// },
// {
// taskType: 'timedTaskWithLimitedConcurrency',
// params: { delay: 100 },
// state: {},
// },
// ],
// },
// state: {},
// });
// ensureOverlappingTasksDontExceedThreshold(
// ephemeralTaskWithSingleConcurrency.state.executions,
// // make sure each task intersects with, at most, 1 other task
// 1
// );
// });
// TODO: Add this back in with https://github.com/elastic/kibana/issues/106139
// it('Ephemeral task executions cant exceed the max workes in Task Manager', async () => {
// const ephemeralTaskWithSingleConcurrency: {
// state: {
// executions: Array<{
// result: {
// id: string;
// state: {
// timings: Array<{
// start: number;
// stop: number;
// }>;
// };
// };
// }>;
// };
// } = await runEphemeralTaskNow({
// taskType: 'taskWhichExecutesOtherTasksEphemerally',
// params: {
// tasks: times(20, () => ({
// taskType: 'timedTask',
// params: { delay: 100 },
// state: {},
// })),
// },
// state: {},
// });
// ensureOverlappingTasksDontExceedThreshold(
// ephemeralTaskWithSingleConcurrency.state.executions,
// // make sure each task intersects with, at most, 9 other tasks (as max workes is 10)
// 9
// );
// });
});
// TODO: Add this back in with https://github.com/elastic/kibana/issues/106139

View file

@ -30,8 +30,6 @@ export default async function ({ readConfigFile }: FtrConfigProviderContext) {
'--xpack.eventLog.indexEntries=true',
'--xpack.task_manager.claim_strategy="update_by_query"',
'--xpack.task_manager.monitored_aggregated_stats_refresh_rate=5000',
'--xpack.task_manager.ephemeral_tasks.enabled=false',
'--xpack.task_manager.ephemeral_tasks.request_capacity=100',
'--xpack.task_manager.metrics_reset_interval=40000',
`--xpack.stack_connectors.enableExperimental=${JSON.stringify([
'crowdstrikeConnectorOn',

View file

@ -219,45 +219,6 @@ export function initRoutes(
}
);
router.post(
{
path: `/api/sample_tasks/ephemeral_run_now`,
validate: {
body: schema.object({
task: schema.object({
taskType: schema.string(),
state: schema.recordOf(schema.string(), schema.any()),
params: schema.recordOf(schema.string(), schema.any()),
}),
}),
},
},
async function (
context: RequestHandlerContext,
req: KibanaRequest<
any,
any,
{
task: {
taskType: string;
params: Record<string, any>;
state: Record<string, any>;
};
},
any
>,
res: KibanaResponseFactory
): Promise<IKibanaResponse<any>> {
const { task } = req.body;
try {
const taskManager = await taskManagerStart;
return res.ok({ body: await taskManager.ephemeralRunNow(task) });
} catch (err) {
return res.ok({ body: { task, error: `${err}` } });
}
}
);
router.post(
{
path: `/api/sample_tasks/ensure_scheduled`,

View file

@ -15,7 +15,6 @@ import {
TaskManagerSetupContract,
TaskManagerStartContract,
ConcreteTaskInstance,
EphemeralTask,
} from '@kbn/task-manager-plugin/server';
import { DEFAULT_MAX_WORKERS } from '@kbn/task-manager-plugin/server/config';
import { TaskPriority } from '@kbn/task-manager-plugin/server/task';
@ -45,8 +44,6 @@ export class SampleTaskManagerFixturePlugin
const taskTestingEvents = new EventEmitter();
taskTestingEvents.setMaxListeners(DEFAULT_MAX_WORKERS * 2);
const tmStart = this.taskManagerStart;
const defaultSampleTaskConfig = {
timeout: '1m',
// This task allows tests to specify its behavior (whether it reschedules itself, whether it errors, etc)
@ -311,37 +308,6 @@ export class SampleTaskManagerFixturePlugin
'A task that can only have two concurrent instance and tracks its execution timing.',
...taskWithTiming,
},
taskWhichExecutesOtherTasksEphemerally: {
title: 'Task Which Executes Other Tasks Ephemerally',
description: 'A sample task used to validate how ephemeral tasks are executed.',
maxAttempts: 1,
timeout: '60s',
createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => ({
async run() {
const {
params: { tasks = [] },
} = taskInstance;
const tm = await tmStart;
const executions = await Promise.all(
(tasks as EphemeralTask[]).map(async (task) => {
return tm
.ephemeralRunNow(task)
.then((result) => ({
result,
}))
.catch((error) => ({
error,
}));
})
);
return {
state: { executions },
};
},
}),
},
});
taskManager.addMiddleware({

View file

@ -299,7 +299,6 @@ export default function ({ getService }: FtrProviderContext) {
expect(typeof execution.duration.sampleTask.p95).to.eql('number');
expect(typeof execution.duration.sampleTask.p99).to.eql('number');
expect(typeof execution.persistence.ephemeral).to.eql('number');
expect(typeof execution.persistence.non_recurring).to.eql('number');
expect(typeof execution.persistence.recurring).to.eql('number');

View file

@ -200,20 +200,6 @@ export default function ({ getService }: FtrProviderContext) {
.then((response: { body: BulkUpdateTaskResult }) => response.body);
}
// TODO: Add this back in with https://github.com/elastic/kibana/issues/106139
// function runEphemeralTaskNow(task: {
// taskType: string;
// params: Record<string, any>;
// state: Record<string, any>;
// }) {
// return supertest
// .post('/api/sample_tasks/ephemeral_run_now')
// .set('kbn-xsrf', 'xxx')
// .send({ task })
// .expect(200)
// .then((response) => response.body);
// }
function scheduleTaskIfNotExists(task: Partial<ConcreteTaskInstance>) {
return supertest
.post('/api/sample_tasks/ensure_scheduled')
@ -915,196 +901,6 @@ export default function ({ getService }: FtrProviderContext) {
expect(task.runAt).to.eql(scheduledRunAt);
});
});
// TODO: Add this back in with https://github.com/elastic/kibana/issues/106139
// it('should return the resulting task state when asked to run an ephemeral task now', async () => {
// const ephemeralTask = await runEphemeralTaskNow({
// taskType: 'sampleTask',
// params: {},
// state: {},
// });
// await retry.try(async () => {
// expect(
// (await historyDocs()).filter((taskDoc) => taskDoc._source.taskId === ephemeralTask.id)
// .length
// ).to.eql(1);
// expect(ephemeralTask.state.count).to.eql(1);
// });
// const secondEphemeralTask = await runEphemeralTaskNow({
// taskType: 'sampleTask',
// params: {},
// // pass state from previous ephemeral run as input for the second run
// state: ephemeralTask.state,
// });
// // ensure state is cumulative
// expect(secondEphemeralTask.state.count).to.eql(2);
// await retry.try(async () => {
// // ensure new id is produced for second task execution
// expect(
// (await historyDocs()).filter((taskDoc) => taskDoc._source.taskId === ephemeralTask.id)
// .length
// ).to.eql(1);
// expect(
// (await historyDocs()).filter(
// (taskDoc) => taskDoc._source.taskId === secondEphemeralTask.id
// ).length
// ).to.eql(1);
// });
// });
// TODO: Add this back in with https://github.com/elastic/kibana/issues/106139
// it('Epheemral task run should only run one instance of a task if its maxConcurrency is 1', async () => {
// const ephemeralTaskWithSingleConcurrency: {
// state: {
// executions: Array<{
// result: {
// id: string;
// state: {
// timings: Array<{
// start: number;
// stop: number;
// }>;
// };
// };
// }>;
// };
// } = await runEphemeralTaskNow({
// taskType: 'taskWhichExecutesOtherTasksEphemerally',
// params: {
// tasks: [
// {
// taskType: 'timedTaskWithSingleConcurrency',
// params: { delay: 1000 },
// state: {},
// },
// {
// taskType: 'timedTaskWithSingleConcurrency',
// params: { delay: 1000 },
// state: {},
// },
// {
// taskType: 'timedTaskWithSingleConcurrency',
// params: { delay: 1000 },
// state: {},
// },
// {
// taskType: 'timedTaskWithSingleConcurrency',
// params: { delay: 1000 },
// state: {},
// },
// ],
// },
// state: {},
// });
// ensureOverlappingTasksDontExceedThreshold(
// ephemeralTaskWithSingleConcurrency.state.executions,
// // make sure each task intersects with any other task
// 0
// );
// });
// TODO: Add this back in with https://github.com/elastic/kibana/issues/106139
// it('Ephemeral task run should only run as many instances of a task as its maxConcurrency will allow', async () => {
// const ephemeralTaskWithSingleConcurrency: {
// state: {
// executions: Array<{
// result: {
// id: string;
// state: {
// timings: Array<{
// start: number;
// stop: number;
// }>;
// };
// };
// }>;
// };
// } = await runEphemeralTaskNow({
// taskType: 'taskWhichExecutesOtherTasksEphemerally',
// params: {
// tasks: [
// {
// taskType: 'timedTaskWithLimitedConcurrency',
// params: { delay: 100 },
// state: {},
// },
// {
// taskType: 'timedTaskWithLimitedConcurrency',
// params: { delay: 100 },
// state: {},
// },
// {
// taskType: 'timedTaskWithLimitedConcurrency',
// params: { delay: 100 },
// state: {},
// },
// {
// taskType: 'timedTaskWithLimitedConcurrency',
// params: { delay: 100 },
// state: {},
// },
// {
// taskType: 'timedTaskWithLimitedConcurrency',
// params: { delay: 100 },
// state: {},
// },
// {
// taskType: 'timedTaskWithLimitedConcurrency',
// params: { delay: 100 },
// state: {},
// },
// ],
// },
// state: {},
// });
// ensureOverlappingTasksDontExceedThreshold(
// ephemeralTaskWithSingleConcurrency.state.executions,
// // make sure each task intersects with, at most, 1 other task
// 1
// );
// });
// TODO: Add this back in with https://github.com/elastic/kibana/issues/106139
// it('Ephemeral task executions cant exceed the max workes in Task Manager', async () => {
// const ephemeralTaskWithSingleConcurrency: {
// state: {
// executions: Array<{
// result: {
// id: string;
// state: {
// timings: Array<{
// start: number;
// stop: number;
// }>;
// };
// };
// }>;
// };
// } = await runEphemeralTaskNow({
// taskType: 'taskWhichExecutesOtherTasksEphemerally',
// params: {
// tasks: times(20, () => ({
// taskType: 'timedTask',
// params: { delay: 100 },
// state: {},
// })),
// },
// state: {},
// });
// ensureOverlappingTasksDontExceedThreshold(
// ephemeralTaskWithSingleConcurrency.state.executions,
// // make sure each task intersects with, at most, 9 other tasks (as max workes is 10)
// 9
// );
// });
});
// TODO: Add this back in with https://github.com/elastic/kibana/issues/106139