mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
[Task Manager] Increment task attempts
when they fail during markTaskAsRunning (#88669)
When something causes an exception in `TaskRunner.markTaskAsRunning()` its execution fails, but this happens before we update the SO, which means that this failure does not count towards the `attempts` on the task. Task Manager will continue to try running this task for ever. This PR increments the `attempts` when a failure occurs during `TaskRunner.markTaskAsRunning()` to ensure such a task doesn't continue to run to infinity. Note that this fix will not affect `scheduled` tasks, as they are designed to _ignore_ their `attempts` and run for ever. In such a case this task will continue to consume Task Manager resources until canceled, but these failures will be logged and could be identified when needed.
This commit is contained in:
parent
b3bec0d6ef
commit
c89f1f18d3
4 changed files with 191 additions and 30 deletions
|
@ -666,9 +666,7 @@ describe('TaskManagerRunner', () => {
|
|||
|
||||
store.update = sinon
|
||||
.stub()
|
||||
.throws(
|
||||
SavedObjectsErrorHelpers.decorateConflictError(new Error('repo error')).output.payload
|
||||
);
|
||||
.throws(SavedObjectsErrorHelpers.decorateConflictError(new Error('repo error')));
|
||||
|
||||
expect(await runner.markTaskAsRunning()).toEqual(false);
|
||||
});
|
||||
|
@ -699,15 +697,126 @@ describe('TaskManagerRunner', () => {
|
|||
|
||||
store.update = sinon
|
||||
.stub()
|
||||
.throws(SavedObjectsErrorHelpers.createGenericNotFoundError('type', 'id').output.payload);
|
||||
.throws(SavedObjectsErrorHelpers.createGenericNotFoundError('type', 'id'));
|
||||
|
||||
return expect(runner.markTaskAsRunning()).rejects.toMatchInlineSnapshot(`
|
||||
Object {
|
||||
"error": "Not Found",
|
||||
"message": "Saved object [type/id] not found",
|
||||
"statusCode": 404,
|
||||
}
|
||||
`);
|
||||
return expect(runner.markTaskAsRunning()).rejects.toMatchInlineSnapshot(
|
||||
`[Error: Saved object [type/id] not found]`
|
||||
);
|
||||
});
|
||||
|
||||
test(`it tries to increment a task's attempts when markTaskAsRunning fails for unexpected reasons`, async () => {
|
||||
const id = _.random(1, 20).toString();
|
||||
const initialAttempts = _.random(1, 3);
|
||||
const nextRetry = new Date(Date.now() + _.random(15, 100) * 1000);
|
||||
const timeoutMinutes = 1;
|
||||
const getRetryStub = sinon.stub().returns(nextRetry);
|
||||
const { runner, store } = testOpts({
|
||||
instance: {
|
||||
id,
|
||||
attempts: initialAttempts,
|
||||
schedule: undefined,
|
||||
},
|
||||
definitions: {
|
||||
bar: {
|
||||
title: 'Bar!',
|
||||
timeout: `${timeoutMinutes}m`,
|
||||
getRetry: getRetryStub,
|
||||
createTaskRunner: () => ({
|
||||
run: async () => undefined,
|
||||
}),
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
store.update = sinon.stub();
|
||||
store.update.onFirstCall().throws(SavedObjectsErrorHelpers.createBadRequestError('type'));
|
||||
store.update.onSecondCall().resolves();
|
||||
|
||||
await expect(runner.markTaskAsRunning()).rejects.toMatchInlineSnapshot(
|
||||
`[Error: type: Bad Request]`
|
||||
);
|
||||
|
||||
sinon.assert.calledWith(store.update, {
|
||||
...mockInstance({
|
||||
id,
|
||||
attempts: initialAttempts + 1,
|
||||
schedule: undefined,
|
||||
}),
|
||||
status: TaskStatus.Idle,
|
||||
startedAt: null,
|
||||
retryAt: null,
|
||||
ownerId: null,
|
||||
});
|
||||
});
|
||||
|
||||
test(`it doesnt try to increment a task's attempts when markTaskAsRunning fails for version conflict`, async () => {
|
||||
const id = _.random(1, 20).toString();
|
||||
const initialAttempts = _.random(1, 3);
|
||||
const nextRetry = new Date(Date.now() + _.random(15, 100) * 1000);
|
||||
const timeoutMinutes = 1;
|
||||
const getRetryStub = sinon.stub().returns(nextRetry);
|
||||
const { runner, store } = testOpts({
|
||||
instance: {
|
||||
id,
|
||||
attempts: initialAttempts,
|
||||
schedule: undefined,
|
||||
},
|
||||
definitions: {
|
||||
bar: {
|
||||
title: 'Bar!',
|
||||
timeout: `${timeoutMinutes}m`,
|
||||
getRetry: getRetryStub,
|
||||
createTaskRunner: () => ({
|
||||
run: async () => undefined,
|
||||
}),
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
store.update = sinon.stub();
|
||||
store.update.onFirstCall().throws(SavedObjectsErrorHelpers.createConflictError('type', 'id'));
|
||||
store.update.onSecondCall().resolves();
|
||||
|
||||
await expect(runner.markTaskAsRunning()).resolves.toMatchInlineSnapshot(`false`);
|
||||
|
||||
sinon.assert.calledOnce(store.update);
|
||||
});
|
||||
|
||||
test(`it doesnt try to increment a task's attempts when markTaskAsRunning fails due to Saved Object not being found`, async () => {
|
||||
const id = _.random(1, 20).toString();
|
||||
const initialAttempts = _.random(1, 3);
|
||||
const nextRetry = new Date(Date.now() + _.random(15, 100) * 1000);
|
||||
const timeoutMinutes = 1;
|
||||
const getRetryStub = sinon.stub().returns(nextRetry);
|
||||
const { runner, store } = testOpts({
|
||||
instance: {
|
||||
id,
|
||||
attempts: initialAttempts,
|
||||
schedule: undefined,
|
||||
},
|
||||
definitions: {
|
||||
bar: {
|
||||
title: 'Bar!',
|
||||
timeout: `${timeoutMinutes}m`,
|
||||
getRetry: getRetryStub,
|
||||
createTaskRunner: () => ({
|
||||
run: async () => undefined,
|
||||
}),
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
store.update = sinon.stub();
|
||||
store.update
|
||||
.onFirstCall()
|
||||
.throws(SavedObjectsErrorHelpers.createGenericNotFoundError('type', 'id'));
|
||||
store.update.onSecondCall().resolves();
|
||||
|
||||
await expect(runner.markTaskAsRunning()).rejects.toMatchInlineSnapshot(
|
||||
`[Error: Saved object [type/id] not found]`
|
||||
);
|
||||
|
||||
sinon.assert.calledOnce(store.update);
|
||||
});
|
||||
|
||||
test('uses getRetry (returning true) to set retryAt when defined', async () => {
|
||||
|
@ -1114,12 +1223,8 @@ describe('TaskManagerRunner', () => {
|
|||
};
|
||||
}
|
||||
|
||||
function testOpts(opts: TestOpts) {
|
||||
const callCluster = sinon.stub();
|
||||
const createTaskRunner = sinon.stub();
|
||||
const logger = mockLogger();
|
||||
|
||||
const instance = Object.assign(
|
||||
function mockInstance(instance: Partial<ConcreteTaskInstance> = {}) {
|
||||
return Object.assign(
|
||||
{
|
||||
id: 'foo',
|
||||
taskType: 'bar',
|
||||
|
@ -1137,8 +1242,16 @@ describe('TaskManagerRunner', () => {
|
|||
user: 'example',
|
||||
ownerId: null,
|
||||
},
|
||||
opts.instance || {}
|
||||
instance
|
||||
);
|
||||
}
|
||||
|
||||
function testOpts(opts: TestOpts) {
|
||||
const callCluster = sinon.stub();
|
||||
const createTaskRunner = sinon.stub();
|
||||
const logger = mockLogger();
|
||||
|
||||
const instance = mockInstance(opts.instance);
|
||||
|
||||
const store = {
|
||||
update: sinon.stub(),
|
||||
|
|
|
@ -10,13 +10,23 @@
|
|||
* rescheduling, middleware application, etc.
|
||||
*/
|
||||
|
||||
import { Logger } from 'src/core/server';
|
||||
import apm from 'elastic-apm-node';
|
||||
import { performance } from 'perf_hooks';
|
||||
import { identity, defaults, flow } from 'lodash';
|
||||
import { Logger, SavedObjectsErrorHelpers } from '../../../../../src/core/server';
|
||||
|
||||
import { Middleware } from '../lib/middleware';
|
||||
import { asOk, asErr, mapErr, eitherAsync, unwrap, isOk, mapOk, Result } from '../lib/result_type';
|
||||
import {
|
||||
asOk,
|
||||
asErr,
|
||||
mapErr,
|
||||
eitherAsync,
|
||||
unwrap,
|
||||
isOk,
|
||||
mapOk,
|
||||
Result,
|
||||
promiseResult,
|
||||
} from '../lib/result_type';
|
||||
import {
|
||||
TaskRun,
|
||||
TaskMarkRunning,
|
||||
|
@ -226,17 +236,15 @@ export class TaskManagerRunner implements TaskRunner {
|
|||
'taskManager'
|
||||
);
|
||||
|
||||
const VERSION_CONFLICT_STATUS = 409;
|
||||
const now = new Date();
|
||||
|
||||
const { taskInstance } = await this.beforeMarkRunning({
|
||||
taskInstance: this.instance,
|
||||
});
|
||||
|
||||
const attempts = taskInstance.attempts + 1;
|
||||
const ownershipClaimedUntil = taskInstance.retryAt;
|
||||
|
||||
try {
|
||||
const { taskInstance } = await this.beforeMarkRunning({
|
||||
taskInstance: this.instance,
|
||||
});
|
||||
|
||||
const attempts = taskInstance.attempts + 1;
|
||||
const ownershipClaimedUntil = taskInstance.retryAt;
|
||||
|
||||
const { id } = taskInstance;
|
||||
|
||||
const timeUntilClaimExpires = howManyMsUntilOwnershipClaimExpires(ownershipClaimedUntil);
|
||||
|
@ -284,7 +292,16 @@ export class TaskManagerRunner implements TaskRunner {
|
|||
if (apmTrans) apmTrans.end('failure');
|
||||
performanceStopMarkingTaskAsRunning();
|
||||
this.onTaskEvent(asTaskMarkRunningEvent(this.id, asErr(error)));
|
||||
if (error.statusCode !== VERSION_CONFLICT_STATUS) {
|
||||
if (!SavedObjectsErrorHelpers.isConflictError(error)) {
|
||||
if (!SavedObjectsErrorHelpers.isNotFoundError(error)) {
|
||||
// try to release claim as an unknown failure prevented us from marking as running
|
||||
mapErr((errReleaseClaim: Error) => {
|
||||
this.logger.error(
|
||||
`[Task Runner] Task ${this.instance.id} failed to release claim after failure: ${errReleaseClaim}`
|
||||
);
|
||||
}, await this.releaseClaimAndIncrementAttempts());
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
@ -314,6 +331,19 @@ export class TaskManagerRunner implements TaskRunner {
|
|||
: asOk(result || EMPTY_RUN_RESULT);
|
||||
}
|
||||
|
||||
private async releaseClaimAndIncrementAttempts(): Promise<Result<ConcreteTaskInstance, Error>> {
|
||||
return promiseResult(
|
||||
this.bufferedTaskStore.update({
|
||||
...this.instance,
|
||||
status: TaskStatus.Idle,
|
||||
attempts: this.instance.attempts + 1,
|
||||
startedAt: null,
|
||||
retryAt: null,
|
||||
ownerId: null,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
private shouldTryToScheduleRetry(): boolean {
|
||||
if (this.instance.schedule) {
|
||||
return true;
|
||||
|
|
|
@ -167,6 +167,9 @@ export class SampleTaskManagerFixturePlugin
|
|||
},
|
||||
|
||||
async beforeMarkRunning(context) {
|
||||
if (context.taskInstance?.params?.originalParams?.throwOnMarkAsRunning) {
|
||||
throw new Error(`Sample task ${context.taskInstance.id} threw on MarkAsRunning`);
|
||||
}
|
||||
return context;
|
||||
},
|
||||
});
|
||||
|
|
|
@ -524,6 +524,21 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
});
|
||||
});
|
||||
|
||||
it('should increment attempts when task fails on markAsRunning', async () => {
|
||||
const originalTask = await scheduleTask({
|
||||
taskType: 'sampleTask',
|
||||
params: { throwOnMarkAsRunning: true },
|
||||
});
|
||||
|
||||
await delay(DEFAULT_POLL_INTERVAL * 3);
|
||||
|
||||
await retry.try(async () => {
|
||||
const task = await currentTask(originalTask.id);
|
||||
expect(task.attempts).to.eql(3);
|
||||
expect(task.status).to.eql('failed');
|
||||
});
|
||||
});
|
||||
|
||||
it('should return a task run error result when trying to run a non-existent task', async () => {
|
||||
// runNow should fail
|
||||
const failedRunNowResult = await runTaskNow({
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue