Skip claiming tasks that were modified during the task claiming phase (#198711)

Resolves https://github.com/elastic/kibana/issues/196300

In this PR, I'm removing the fallback we had when `startedAt` value is
missing (https://github.com/elastic/kibana/pull/194759) in favour of
dropping the task document from the claiming cycle. The additional logs
that were added showed that tasks that were missing a `startedAt` value
was because they were being re-created during the exact same time they
were being claimed, causing the task to have an `idle` status and a
missing `startedAt` value. Given the scenario, it feels better to drop
them from getting claimed and to instead try again at the next claim
cycle where the race condition shouldn't occur.
This commit is contained in:
Mike Côté 2024-11-06 22:15:31 -05:00 committed by GitHub
parent 1fa30899ab
commit a19dd8ea97
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 42 additions and 40 deletions

View file

@ -1344,15 +1344,15 @@ describe('TaskClaiming', () => {
expect(result.docs.length).toEqual(3);
});
test('should assign startedAt value if bulkGet returns task with null startedAt', async () => {
test('should skip tasks where bulkGet returns a newer task document than the bulkPartialUpdate', async () => {
const store = taskStoreMock.create({ taskManagerId: 'test-test' });
store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`));
const fetchedTasks = [
mockInstance({ id: `id-1`, taskType: 'report' }),
mockInstance({ id: `id-2`, taskType: 'report' }),
mockInstance({ id: `id-3`, taskType: 'yawn' }),
mockInstance({ id: `id-4`, taskType: 'report' }),
mockInstance({ id: `id-1`, taskType: 'report', version: '123' }),
mockInstance({ id: `id-2`, taskType: 'report', version: '123' }),
mockInstance({ id: `id-3`, taskType: 'yawn', version: '123' }),
mockInstance({ id: `id-4`, taskType: 'report', version: '123' }),
];
const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks);
@ -1365,7 +1365,7 @@ describe('TaskClaiming', () => {
);
store.bulkGet.mockResolvedValueOnce([
asOk({ ...fetchedTasks[0], startedAt: new Date() }),
asOk(fetchedTasks[1]),
asOk({ ...fetchedTasks[1], startedAt: new Date(), version: 'abc' }),
asOk({ ...fetchedTasks[2], startedAt: new Date() }),
asOk({ ...fetchedTasks[3], startedAt: new Date() }),
]);
@ -1399,11 +1399,11 @@ describe('TaskClaiming', () => {
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(taskManagerLogger.warn).toHaveBeenCalledWith(
'Task id-2 has a null startedAt value, setting to current time - ownerId null, status idle',
'Task id-2 was modified during the claiming phase, skipping until the next claiming cycle.',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
@ -1463,14 +1463,14 @@ describe('TaskClaiming', () => {
expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-2', 'id-3', 'id-4']);
expect(result.stats).toEqual({
tasksClaimed: 4,
tasksConflicted: 0,
tasksClaimed: 3,
tasksConflicted: 1,
tasksErrors: 0,
tasksUpdated: 4,
tasksUpdated: 3,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(4);
expect(result.docs.length).toEqual(3);
for (const r of result.docs) {
expect(r.startedAt).not.toBeNull();
}

View file

@ -195,7 +195,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
}
// perform the task object updates, deal with errors
const updatedTaskIds: string[] = [];
const updatedTasks: Record<string, PartialConcreteTaskInstance> = {};
let conflicts = 0;
let bulkUpdateErrors = 0;
let bulkGetErrors = 0;
@ -203,7 +203,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
const updateResults = await taskStore.bulkPartialUpdate(taskUpdates);
for (const updateResult of updateResults) {
if (isOk(updateResult)) {
updatedTaskIds.push(updateResult.value.id);
updatedTasks[updateResult.value.id] = updateResult.value;
} else {
const { id, type, error, status } = updateResult.error;
@ -218,29 +218,23 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
}
// perform an mget to get the full task instance for claiming
const fullTasksToRun = (await taskStore.bulkGet(updatedTaskIds)).reduce<ConcreteTaskInstance[]>(
(acc, task) => {
if (isOk(task)) {
acc.push(task.value);
} else {
const { id, type, error } = task.error;
logger.error(`Error getting full task ${id}:${type} during claim: ${error.message}`);
bulkGetErrors++;
}
return acc;
},
[]
);
// Look for tasks that have a null startedAt value, log them and manually set a startedAt field
for (const task of fullTasksToRun) {
if (task.startedAt == null) {
const fullTasksToRun = (await taskStore.bulkGet(Object.keys(updatedTasks))).reduce<
ConcreteTaskInstance[]
>((acc, task) => {
if (isOk(task) && task.value.version !== updatedTasks[task.value.id].version) {
logger.warn(
`Task ${task.id} has a null startedAt value, setting to current time - ownerId ${task.ownerId}, status ${task.status}`
`Task ${task.value.id} was modified during the claiming phase, skipping until the next claiming cycle.`
);
task.startedAt = now;
conflicts++;
} else if (isOk(task)) {
acc.push(task.value);
} else {
const { id, type, error } = task.error;
logger.error(`Error getting full task ${id}:${type} during claim: ${error.message}`);
bulkGetErrors++;
}
}
return acc;
}, []);
// separate update for removed tasks; shouldn't happen often, so unlikely
// a performance concern, and keeps the rest of the logic simpler

View file

@ -1020,7 +1020,10 @@ describe('TaskStore', () => {
refresh: false,
});
expect(result).toEqual([asOk(task)]);
expect(result).toEqual([
// New version returned after update
asOk({ ...task, version: 'Wzg0LDFd' }),
]);
});
test(`should perform partial update with minimal fields`, async () => {
@ -1062,7 +1065,8 @@ describe('TaskStore', () => {
refresh: false,
});
expect(result).toEqual([asOk(task)]);
// New version returned after update
expect(result).toEqual([asOk({ ...task, version: 'Wzg0LDFd' })]);
});
test(`should perform partial update with no version`, async () => {
@ -1100,7 +1104,8 @@ describe('TaskStore', () => {
refresh: false,
});
expect(result).toEqual([asOk(task)]);
// New version returned after update
expect(result).toEqual([asOk({ ...task, version: 'Wzg0LDFd' })]);
});
test(`should gracefully handle errors within the response`, async () => {
@ -1183,7 +1188,8 @@ describe('TaskStore', () => {
});
expect(result).toEqual([
asOk(task1),
// New version returned after update
asOk({ ...task1, version: 'Wzg0LDFd' }),
asErr({
type: 'task',
id: '45343254',
@ -1267,7 +1273,8 @@ describe('TaskStore', () => {
});
expect(result).toEqual([
asOk(task1),
// New version returned after update
asOk({ ...task1, version: 'Wzg0LDFd' }),
asErr({
type: 'task',
id: 'unknown',

View file

@ -26,7 +26,7 @@ import {
ElasticsearchClient,
} from '@kbn/core/server';
import { decodeRequestVersion } from '@kbn/core-saved-objects-base-server-internal';
import { decodeRequestVersion, encodeVersion } from '@kbn/core-saved-objects-base-server-internal';
import { RequestTimeoutsConfig } from './config';
import { asOk, asErr, Result } from './lib/result_type';
@ -427,6 +427,7 @@ export class TaskStore {
return asOk({
...doc,
id: docId,
version: encodeVersion(item.update._seq_no, item.update._primary_term),
});
});
}