Fix bug where mget task update conflicts are not properly handled (#192392)

In this PR, I'm fixing the mget task claimer to properly handle 409 task
update conflicts. Previously they were treated as regular errors,
following the regular error path and reflecting in the metrics. With
this change, the metrics will no longer show errors when update
conflicts exist.

## To verify

1. Set the following diff in your code, this will trigger claim
conflicts 100% of the time
```
diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts
index 4eae5d9d707..54b386174f3 100644
--- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts
+++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts
@@ -194,6 +194,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
       // omits "enabled" field from task updates so we don't overwrite
       // any user initiated changes to "enabled" while the task was running
       ...omit(task, 'enabled'),
+      version: 'WzUwMDAsMTAwMF0=',
       scheduledAt:
         task.retryAt != null && new Date(task.retryAt).getTime() < Date.now()
           ? task.retryAt
@@ -222,6 +223,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
     } else {
       const { id, type, error } = updateResult.error;
       if (error.statusCode === 409) {
+        console.log('*** Conflict');
         conflicts++;
       } else {
         logger.error(`Error updating task ${id}:${type} during claim: ${error.message}`, logMeta);
```
2. Startup Elasticsearch and Kibana
3. Observe conflict messages logged and not `Error updating task...`
4. Pull the task manager metrics (`/api/task_manager/metrics`)
5. Observe all task claims are successful and `total_errors` is `0`
This commit is contained in:
Mike Côté 2024-09-09 14:36:49 -04:00 committed by GitHub
parent caa8472ef8
commit e02e8fc1ff
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 130 additions and 14 deletions

View file

@ -1415,6 +1415,134 @@ describe('TaskClaiming', () => {
expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-2', 'id-3', 'id-4']);
});
test('should handle conflict errors when bulk updating the task doc', async () => {
const store = taskStoreMock.create({ taskManagerId: 'test-test' });
store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`));
const fetchedTasks = [
mockInstance({ id: `id-1`, taskType: 'report' }),
mockInstance({ id: `id-2`, taskType: 'report' }),
mockInstance({ id: `id-3`, taskType: 'yawn' }),
mockInstance({ id: `id-4`, taskType: 'report' }),
];
const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks);
store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap });
store.getDocVersions.mockResolvedValueOnce(docLatestVersions);
store.bulkUpdate.mockResolvedValueOnce([
asOk(fetchedTasks[0]),
// @ts-expect-error
asErr({
type: 'task',
id: fetchedTasks[1].id,
error: {
statusCode: 409,
message: 'Conflict',
},
}),
asOk(fetchedTasks[2]),
asOk(fetchedTasks[3]),
]);
store.bulkGet.mockResolvedValueOnce([
asOk(fetchedTasks[0]),
asOk(fetchedTasks[2]),
asOk(fetchedTasks[3]),
]);
const taskClaiming = new TaskClaiming({
logger: taskManagerLogger,
strategy: CLAIM_STRATEGY_MGET,
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
unusedTypes: [],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
});
const [resultOrErr] = await getAllAsPromise(
taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() })
);
if (!isOk<ClaimOwnershipResult, FillPoolResult>(resultOrErr)) {
expect(resultOrErr).toBe(undefined);
}
const result = unwrap(resultOrErr) as ClaimOwnershipResult;
expect(apm.startTransaction).toHaveBeenCalledWith(
TASK_MANAGER_MARK_AS_CLAIMED,
TASK_MANAGER_TRANSACTION_TYPE
);
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
{ tags: ['claimAvailableTasksMget'] }
);
expect(taskManagerLogger.error).not.toHaveBeenCalled();
expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
size: 40,
seq_no_primary_term: true,
});
expect(store.getDocVersions).toHaveBeenCalledWith([
'task:id-1',
'task:id-2',
'task:id-3',
'task:id-4',
]);
expect(store.bulkUpdate).toHaveBeenCalledTimes(1);
expect(store.bulkUpdate).toHaveBeenCalledWith(
[
{
...fetchedTasks[0],
attempts: 1,
ownerId: 'test-test',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[1],
attempts: 1,
ownerId: 'test-test',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[2],
attempts: 1,
ownerId: 'test-test',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
...fetchedTasks[3],
attempts: 1,
ownerId: 'test-test',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
],
{ validate: false, excludeLargeFields: true }
);
expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-3', 'id-4']);
expect(result.stats).toEqual({
tasksClaimed: 3,
tasksConflicted: 1,
tasksErrors: 0,
tasksUpdated: 3,
tasksLeftUnclaimed: 0,
});
expect(result.docs.length).toEqual(3);
});
test('should handle individual errors when bulk updating the task doc', async () => {
const store = taskStoreMock.create({ taskManagerId: 'test-test' });
store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`));

View file

@ -13,8 +13,6 @@
// - from the non-stale search results, return as many as we can run based on available
// capacity and the cost of each task type to run
import { SavedObjectsErrorHelpers } from '@kbn/core/server';
import apm, { Logger } from 'elastic-apm-node';
import { Subject, Observable } from 'rxjs';
@ -223,18 +221,8 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
updatedTasks.push(updateResult.value);
} else {
const { id, type, error } = updateResult.error;
// this check is needed so error will be typed correctly for isConflictError
if (SavedObjectsErrorHelpers.isSavedObjectsClientError(error)) {
if (SavedObjectsErrorHelpers.isConflictError(error)) {
conflicts++;
} else {
logger.error(
`Saved Object error updating task ${id}:${type} during claim: ${error.error}`,
logMeta
);
bulkUpdateErrors++;
}
if (error.statusCode === 409) {
conflicts++;
} else {
logger.error(`Error updating task ${id}:${type} during claim: ${error.message}`, logMeta);
bulkUpdateErrors++;