Reapply "[Response Ops][Task Manager] Setting task status directly to running in mget claim strategy (#192303)

Re-doing this PR: https://github.com/elastic/kibana/pull/191669

Reverted because it was causing a flaky test. After a lot of
investigation, it looks like the flakiness was caused by interference
from long-running tasks scheduled as part of other tests. The task
partitions test uses task IDs `1`, `2` and `3` and the tasks were being
short circuited when there were other tasks with UUIDs that started with
`1`, `2` or `3` due to the logic in the task runner that tries to
prevent duplicate recurring tasks from running. That logic just used
`startsWith` to test for duplicates where the identifier is
`${task.id}::${task.executionUUID}`. Updated that logic instead to check
for duplicate `task.id` instead of just using `startsWith` in this
commit:
1646ae9170

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2024-09-09 12:12:25 -04:00 committed by GitHub
parent 7149a869e4
commit 2c5c8adf81
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 370 additions and 167 deletions

View file

@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import sinon from 'sinon';
import { calculateDelayBasedOnAttempts, getRetryDate } from './get_retry_at';
import { createRetryableError } from '../task_running';
let fakeTimer: sinon.SinonFakeTimers;
describe('calculateDelayBasedOnAttempts', () => {
it('returns 30s on the first attempt', () => {
expect(calculateDelayBasedOnAttempts(1)).toBe(30000);
});
it('returns delay with jitter', () => {
const delay = calculateDelayBasedOnAttempts(5);
// with jitter should be random between 0 and 40 min (inclusive)
expect(delay).toBeGreaterThanOrEqual(0);
expect(delay).toBeLessThanOrEqual(2400000);
});
it('returns delay capped at 1 hour', () => {
const delay = calculateDelayBasedOnAttempts(10);
// with jitter should be random between 0 and 1 hr (inclusive)
expect(delay).toBeGreaterThanOrEqual(0);
expect(delay).toBeLessThanOrEqual(60 * 60 * 1000);
});
});
describe('getRetryDate', () => {
beforeAll(() => {
fakeTimer = sinon.useFakeTimers(new Date('2021-01-01T12:00:00.000Z'));
});
afterAll(() => fakeTimer.restore());
it('returns retry date based on number of attempts if error is not retryable', () => {
expect(getRetryDate({ error: new Error('foo'), attempts: 1 })).toEqual(
new Date('2021-01-01T12:00:30.000Z')
);
});
it('returns retry date based on number of attempts and add duration if error is not retryable', () => {
expect(getRetryDate({ error: new Error('foo'), attempts: 1, addDuration: '5m' })).toEqual(
new Date('2021-01-01T12:05:30.000Z')
);
});
it('returns retry date for retryable error with retry date', () => {
expect(
getRetryDate({
error: createRetryableError(new Error('foo'), new Date('2021-02-01T12:00:00.000Z')),
attempts: 1,
})
).toEqual(new Date('2021-02-01T12:00:00.000Z'));
});
it('returns retry date based on number of attempts for retryable error with retry=true', () => {
expect(
getRetryDate({
error: createRetryableError(new Error('foo'), true),
attempts: 1,
})
).toEqual(new Date('2021-01-01T12:00:30.000Z'));
});
it('returns retry date based on number of attempts and add duration for retryable error with retry=true', () => {
expect(
getRetryDate({
error: createRetryableError(new Error('foo'), true),
attempts: 1,
addDuration: '5m',
})
).toEqual(new Date('2021-01-01T12:05:30.000Z'));
});
it('returns undefined for retryable error with retry=false', () => {
expect(
getRetryDate({
error: createRetryableError(new Error('foo'), false),
attempts: 1,
})
).toBeUndefined();
});
});

View file

@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { random } from 'lodash';
import { ConcreteTaskInstance, DEFAULT_TIMEOUT, TaskDefinition } from '../task';
import { isRetryableError } from '../task_running';
import { intervalFromDate, maxIntervalFromDate } from './intervals';
export function getRetryAt(
task: ConcreteTaskInstance,
taskDefinition: TaskDefinition | undefined
): Date | undefined {
const taskTimeout = getTimeout(task, taskDefinition);
if (task.schedule) {
return maxIntervalFromDate(new Date(), task.schedule.interval, taskTimeout);
}
return getRetryDate({
attempts: task.attempts + 1,
// Fake an error. This allows retry logic when tasks keep timing out
// and lets us set a proper "retryAt" value each time.
error: new Error('Task timeout'),
addDuration: taskTimeout,
});
}
export function getRetryDate({
error,
attempts,
addDuration,
}: {
error: Error;
attempts: number;
addDuration?: string;
}): Date | undefined {
const retry: boolean | Date = isRetryableError(error) ?? true;
let result;
if (retry instanceof Date) {
result = retry;
} else if (retry === true) {
result = new Date(Date.now() + calculateDelayBasedOnAttempts(attempts));
}
// Add a duration to the result
if (addDuration && result) {
result = intervalFromDate(result, addDuration)!;
}
return result;
}
export function calculateDelayBasedOnAttempts(attempts: number) {
// Return 30s for the first retry attempt
if (attempts === 1) {
return 30 * 1000;
} else {
const defaultBackoffPerFailure = 5 * 60 * 1000;
const maxDelay = 60 * 60 * 1000;
// For each remaining attempt return an exponential delay with jitter that is capped at 1 hour.
// We adjust the attempts by 2 to ensure that delay starts at 5m for the second retry attempt
// and increases exponentially from there.
return random(Math.min(maxDelay, defaultBackoffPerFailure * Math.pow(2, attempts - 2)));
}
}
export function getTimeout(
task: ConcreteTaskInstance,
taskDefinition: TaskDefinition | undefined
): string {
if (task.schedule) {
return taskDefinition?.timeout ?? DEFAULT_TIMEOUT;
}
return task.timeoutOverride ? task.timeoutOverride : taskDefinition?.timeout ?? DEFAULT_TIMEOUT;
}

View file

@ -222,6 +222,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
usageCounter: this.usageCounter,
eventLoopDelayConfig: { ...this.config.event_loop_delay },
allowReadingInvalidState: this.config.allow_reading_invalid_state,
strategy: this.config.claim_strategy,
});
};

View file

@ -6,6 +6,7 @@
*/
import _ from 'lodash';
import sinon from 'sinon';
import { v4 as uuidv4 } from 'uuid';
import { filter, take, toArray } from 'rxjs';
import { SavedObjectsErrorHelpers } from '@kbn/core/server';
@ -55,6 +56,7 @@ jest.mock('../constants', () => ({
],
}));
let fakeTimer: sinon.SinonFakeTimers;
const taskManagerLogger = mockLogger();
beforeEach(() => jest.clearAllMocks());
@ -110,6 +112,12 @@ const taskPartitioner = new TaskPartitioner({
// needs more tests in the similar to the `strategy_default.test.ts` test suite
describe('TaskClaiming', () => {
beforeAll(() => {
fakeTimer = sinon.useFakeTimers();
});
afterAll(() => fakeTimer.restore());
beforeEach(() => {
jest.clearAllMocks();
jest
@ -399,21 +407,27 @@ describe('TaskClaiming', () => {
[
{
...fetchedTasks[0],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[0].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[1],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[1].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[2],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[2].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
],
{ validate: false, excludeLargeFields: true }
@ -492,9 +506,11 @@ describe('TaskClaiming', () => {
[
{
...fetchedTasks[2],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[2].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
],
{ validate: false, excludeLargeFields: true }
@ -599,9 +615,11 @@ describe('TaskClaiming', () => {
[
{
...fetchedTasks[2],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[2].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
],
{ validate: false, excludeLargeFields: true }
@ -699,9 +717,11 @@ describe('TaskClaiming', () => {
[
{
...fetchedTasks[2],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[2].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
],
{ validate: false, excludeLargeFields: true }
@ -847,15 +867,19 @@ describe('TaskClaiming', () => {
[
{
...fetchedTasks[1],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[1].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[2],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[2].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
],
{ validate: false, excludeLargeFields: true }
@ -933,15 +957,19 @@ describe('TaskClaiming', () => {
[
{
...fetchedTasks[1],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[1].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[2],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[2].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
],
{ validate: false, excludeLargeFields: true }
@ -1019,15 +1047,19 @@ describe('TaskClaiming', () => {
[
{
...fetchedTasks[1],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[1].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[2],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[2].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
],
{ validate: false, excludeLargeFields: true }
@ -1118,27 +1150,35 @@ describe('TaskClaiming', () => {
[
{
...fetchedTasks[0],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[1].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[1],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[1].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[2],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[2].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[4],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[1].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
],
{ validate: false, excludeLargeFields: true }
@ -1236,27 +1276,35 @@ describe('TaskClaiming', () => {
[
{
...fetchedTasks[0],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[0].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[1],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[2].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[2],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[2].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[3],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[3].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
],
{ validate: false, excludeLargeFields: true }
@ -1331,27 +1379,35 @@ describe('TaskClaiming', () => {
[
{
...fetchedTasks[0],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[0].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[1],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[2].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[2],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[2].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[3],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[3].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
],
{ validate: false, excludeLargeFields: true }
@ -1442,27 +1498,35 @@ describe('TaskClaiming', () => {
[
{
...fetchedTasks[0],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[0].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[1],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[1].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[2],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[2].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[3],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[3].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
],
{ validate: false, excludeLargeFields: true }
@ -1535,27 +1599,35 @@ describe('TaskClaiming', () => {
[
{
...fetchedTasks[0],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[0].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[1],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[1].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[2],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[2].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[3],
attempts: 1,
ownerId: 'test-test',
retryAt: fetchedTasks[3].runAt,
status: 'claiming',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
],
{ validate: false, excludeLargeFields: true }

View file

@ -18,6 +18,7 @@ import { SavedObjectsErrorHelpers } from '@kbn/core/server';
import apm, { Logger } from 'elastic-apm-node';
import { Subject, Observable } from 'rxjs';
import { omit } from 'lodash';
import { TaskTypeDictionary } from '../task_type_dictionary';
import {
TaskClaimerOpts,
@ -46,6 +47,7 @@ import { TaskStore, SearchOpts } from '../task_store';
import { isOk, asOk } from '../lib/result_type';
import { selectTasksByCapacity } from './lib/task_selector_by_capacity';
import { TaskPartitioner } from '../lib/task_partitioner';
import { getRetryAt } from '../lib/get_retry_at';
interface OwnershipClaimingOpts {
claimOwnershipUntil: Date;
@ -187,16 +189,21 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
}
// build the updated task objects we'll claim
const now = new Date();
const taskUpdates: ConcreteTaskInstance[] = [];
for (const task of tasksToRun) {
taskUpdates.push({
...task,
// omits "enabled" field from task updates so we don't overwrite
// any user initiated changes to "enabled" while the task was running
...omit(task, 'enabled'),
scheduledAt:
task.retryAt != null && new Date(task.retryAt).getTime() < Date.now()
? task.retryAt
: task.runAt,
status: TaskStatus.Claiming,
retryAt: claimOwnershipUntil,
status: TaskStatus.Running,
startedAt: now,
attempts: task.attempts + 1,
retryAt: getRetryAt(task, definitions.get(task.taskType)) ?? null,
ownerId: taskStore.taskManagerId,
});
}

View file

@ -35,12 +35,12 @@ import { executionContextServiceMock } from '@kbn/core/server/mocks';
import { usageCountersServiceMock } from '@kbn/usage-collection-plugin/server/usage_counters/usage_counters_service.mock';
import { bufferedTaskStoreMock } from '../buffered_task_store.mock';
import {
calculateDelay,
TASK_MANAGER_RUN_TRANSACTION_TYPE,
TASK_MANAGER_TRANSACTION_TYPE,
TASK_MANAGER_TRANSACTION_TYPE_MARK_AS_RUNNING,
} from './task_runner';
import { schema } from '@kbn/config-schema';
import { CLAIM_STRATEGY_MGET, CLAIM_STRATEGY_UPDATE_BY_QUERY } from '../config';
const baseDelay = 5 * 60 * 1000;
const executionContext = executionContextServiceMock.createSetupContract();
@ -769,6 +769,36 @@ describe('TaskManagerRunner', () => {
expect(instance.enabled).not.toBeDefined();
});
test('skips marking task as running for mget claim strategy', async () => {
const { runner, store } = await pendingStageSetup({
instance: {
schedule: {
interval: '10m',
},
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
run: async () => undefined,
}),
},
},
strategy: CLAIM_STRATEGY_MGET,
});
const result = await runner.markTaskAsRunning();
expect(result).toBe(true);
expect(apm.startTransaction).not.toHaveBeenCalled();
expect(mockApmTrans.end).not.toHaveBeenCalled();
expect(runner.id).toEqual('foo');
expect(runner.taskType).toEqual('bar');
expect(runner.toString()).toEqual('bar "foo"');
expect(store.update).not.toHaveBeenCalled();
});
describe('TaskEvents', () => {
test('emits TaskEvent when a task is marked as running', async () => {
const id = _.random(1, 20).toString();
@ -2344,26 +2374,6 @@ describe('TaskManagerRunner', () => {
`Error encountered when running onTaskRemoved() hook for testbar2 "foo": Fail`
);
});
describe('calculateDelay', () => {
it('returns 30s on the first attempt', () => {
expect(calculateDelay(1)).toBe(30000);
});
it('returns delay with jitter', () => {
const delay = calculateDelay(5);
// with jitter should be random between 0 and 40 min (inclusive)
expect(delay).toBeGreaterThanOrEqual(0);
expect(delay).toBeLessThanOrEqual(2400000);
});
it('returns delay capped at 1 hour', () => {
const delay = calculateDelay(10);
// with jitter should be random between 0 and 1 hr (inclusive)
expect(delay).toBeGreaterThanOrEqual(0);
expect(delay).toBeLessThanOrEqual(60 * 60 * 1000);
});
});
});
interface TestOpts {
@ -2371,6 +2381,7 @@ describe('TaskManagerRunner', () => {
definitions?: TaskDefinitionRegistry;
onTaskEvent?: jest.Mock<(event: TaskEvent<unknown, unknown>) => void>;
allowReadingInvalidState?: boolean;
strategy?: string;
}
function withAnyTiming(taskRun: TaskRun) {
@ -2447,6 +2458,7 @@ describe('TaskManagerRunner', () => {
warn_threshold: 5000,
},
allowReadingInvalidState: opts.allowReadingInvalidState || false,
strategy: opts.strategy ?? CLAIM_STRATEGY_UPDATE_BY_QUERY,
});
if (stage === TaskRunningStage.READY_TO_RUN) {

View file

@ -14,7 +14,7 @@
import apm from 'elastic-apm-node';
import { v4 as uuidv4 } from 'uuid';
import { withSpan } from '@kbn/apm-utils';
import { defaults, flow, identity, omit, random } from 'lodash';
import { defaults, flow, identity, omit } from 'lodash';
import { ExecutionContextStart, Logger, SavedObjectsErrorHelpers } from '@kbn/core/server';
import { UsageCounter } from '@kbn/usage-collection-plugin/server';
import { Middleware } from '../lib/middleware';
@ -40,7 +40,7 @@ import {
TaskTiming,
TaskManagerStat,
} from '../task_events';
import { intervalFromDate, maxIntervalFromDate } from '../lib/intervals';
import { intervalFromDate } from '../lib/intervals';
import {
CancelFunction,
CancellableTask,
@ -51,12 +51,12 @@ import {
SuccessfulRunResult,
TaskDefinition,
TaskStatus,
DEFAULT_TIMEOUT,
} from '../task';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { isRetryableError, isUnrecoverableError } from './errors';
import type { EventLoopDelayConfig } from '../config';
import { isUnrecoverableError } from './errors';
import { CLAIM_STRATEGY_MGET, type EventLoopDelayConfig } from '../config';
import { TaskValidator } from '../task_validator';
import { getRetryAt, getRetryDate, getTimeout } from '../lib/get_retry_at';
export const EMPTY_RUN_RESULT: SuccessfulRunResult = { state: {} };
@ -109,6 +109,7 @@ type Opts = {
usageCounter?: UsageCounter;
eventLoopDelayConfig: EventLoopDelayConfig;
allowReadingInvalidState: boolean;
strategy: string;
} & Pick<Middleware, 'beforeRun' | 'beforeMarkRunning'>;
export enum TaskRunResult {
@ -160,6 +161,7 @@ export class TaskManagerRunner implements TaskRunner {
private usageCounter?: UsageCounter;
private eventLoopDelayConfig: EventLoopDelayConfig;
private readonly taskValidator: TaskValidator;
private readonly claimStrategy: string;
/**
* Creates an instance of TaskManagerRunner.
@ -184,6 +186,7 @@ export class TaskManagerRunner implements TaskRunner {
usageCounter,
eventLoopDelayConfig,
allowReadingInvalidState,
strategy,
}: Opts) {
this.instance = asPending(sanitizeInstance(instance));
this.definitions = definitions;
@ -202,6 +205,7 @@ export class TaskManagerRunner implements TaskRunner {
definitions: this.definitions,
allowReadingInvalidState,
});
this.claimStrategy = strategy;
}
/**
@ -223,7 +227,9 @@ export class TaskManagerRunner implements TaskRunner {
* @param id
*/
public isSameTask(executionId: string) {
return executionId.startsWith(this.id);
const executionIdParts = executionId.split('::');
const executionIdCompare = executionIdParts.length > 0 ? executionIdParts[0] : executionId;
return executionIdCompare === this.id;
}
/**
@ -266,14 +272,7 @@ export class TaskManagerRunner implements TaskRunner {
* defined by the task type unless this is an ad-hoc task that specifies an override
*/
public get timeout() {
if (this.instance.task.schedule) {
// recurring tasks should use timeout in task type
return this.definition?.timeout ?? DEFAULT_TIMEOUT;
}
return this.instance.task.timeoutOverride
? this.instance.task.timeoutOverride
: this.definition?.timeout ?? DEFAULT_TIMEOUT;
return getTimeout(this.instance.task, this.definition);
}
/**
@ -442,6 +441,13 @@ export class TaskManagerRunner implements TaskRunner {
);
}
// mget claim strategy sets the task to `running` during the claim cycle
// so this update to mark the task as running is unnecessary
if (this.claimStrategy === CLAIM_STRATEGY_MGET) {
this.instance = asReadyToRun(this.instance.task as ConcreteTaskInstanceWithStartedAt);
return true;
}
const apmTrans = apm.startTransaction(
TASK_MANAGER_TRANSACTION_TYPE_MARK_AS_RUNNING,
TASK_MANAGER_TRANSACTION_TYPE
@ -475,16 +481,7 @@ export class TaskManagerRunner implements TaskRunner {
status: TaskStatus.Running,
startedAt: now,
attempts,
retryAt:
(this.instance.task.schedule
? maxIntervalFromDate(now, this.instance.task.schedule.interval, this.timeout)
: this.getRetryDelay({
attempts,
// Fake an error. This allows retry logic when tasks keep timing out
// and lets us set a proper "retryAt" value each time.
error: new Error('Task timeout'),
addDuration: this.timeout,
})) ?? null,
retryAt: getRetryAt(taskInstance, this.definition) ?? null,
// This is a safe conversion as we're setting the startAt above
},
{ validate: false }
@ -595,7 +592,7 @@ export class TaskManagerRunner implements TaskRunner {
? { schedule }
: // when result.error is truthy, then we're retrying because it failed
{
runAt: this.getRetryDelay({
runAt: getRetryDate({
attempts,
error,
}),
@ -800,31 +797,6 @@ export class TaskManagerRunner implements TaskRunner {
return result;
}
private getRetryDelay({
error,
attempts,
addDuration,
}: {
error: Error;
attempts: number;
addDuration?: string;
}): Date | undefined {
const retry: boolean | Date = isRetryableError(error) ?? true;
let result;
if (retry instanceof Date) {
result = retry;
} else if (retry === true) {
result = new Date(Date.now() + calculateDelay(attempts));
}
// Add a duration to the result
if (addDuration && result) {
result = intervalFromDate(result, addDuration)!;
}
return result;
}
private getMaxAttempts() {
return this.definition?.maxAttempts ?? this.defaultMaxAttempts;
}
@ -883,20 +855,6 @@ export function asRan(task: InstanceOf<TaskRunningStage.RAN, RanTask>): RanTask
};
}
export function calculateDelay(attempts: number) {
// Return 30s for the first retry attempt
if (attempts === 1) {
return 30 * 1000;
} else {
const defaultBackoffPerFailure = 5 * 60 * 1000;
const maxDelay = 60 * 60 * 1000;
// For each remaining attempt return an exponential delay with jitter that is capped at 1 hour.
// We adjust the attempts by 2 to ensure that delay starts at 5m for the second retry attempt
// and increases exponentially from there.
return random(Math.min(maxDelay, defaultBackoffPerFailure * Math.pow(2, attempts - 2)));
}
}
export function getTaskDelayInSeconds(scheduledAt: Date) {
const now = new Date();
return (now.valueOf() - scheduledAt.valueOf()) / 1000;

View file

@ -553,21 +553,6 @@ export default function ({ getService }: FtrProviderContext) {
await releaseTasksWaitingForEventToComplete('releaseSecondWaveOfTasks');
});
it('should increment attempts when task fails on markAsRunning', async () => {
const originalTask = await scheduleTask({
taskType: 'sampleTask',
params: { throwOnMarkAsRunning: true },
});
expect(originalTask.attempts).to.eql(0);
// Wait for task manager to attempt running the task a second time
await retry.try(async () => {
const task = await currentTask(originalTask.id);
expect(task.attempts).to.eql(2);
});
});
it('should return a task run error result when trying to run a non-existent task', async () => {
// runSoon should fail
const failedRunSoonResult = await runTaskSoon({