[Response Ops][Task Manager] Emitting error metric when task update fails (#191307)

Resolves https://github.com/elastic/kibana/issues/184173

## Summary

Catches errors updating the task from the `taskStore.bulkUpdate`
function and emitting an error count so these errors are reflected in
the metrics.

## To Verify

1. Add the following to force an error when running an example rule:

```
--- a/x-pack/plugins/task_manager/server/task_store.ts
+++ b/x-pack/plugins/task_manager/server/task_store.ts
@@ -24,6 +24,7 @@ import {
   ISavedObjectsRepository,
   SavedObjectsUpdateResponse,
   ElasticsearchClient,
+  SavedObjectsErrorHelpers,
 } from '@kbn/core/server';

 import { RequestTimeoutsConfig } from './config';
@@ -309,6 +310,16 @@ export class TaskStore {
       this.logger.warn(`Skipping validation for bulk update because excludeLargeFields=true.`);
     }

+    const isProcessResult = docs.some(
+      (doc) =>
+        doc.taskType === 'alerting:example.always-firing' &&
+        doc.status === 'idle' &&
+        doc.retryAt === null
+    );
+    if (isProcessResult) {
+      throw SavedObjectsErrorHelpers.decorateEsUnavailableError(new Error('test'));
+    }
+
     const attributesByDocId = docs.reduce((attrsById, doc) => {
```

2. Create an `example.always-firing` rule and let it run. You should see
an error in the logs:
```
[2024-08-26T14:44:07.065-04:00][ERROR][plugins.taskManager] Task alerting:example.always-firing "80b8481d-7bfc-4d38-a31b-7a559fbe846b" failed: Error: test
```

3. Navigate to
https://localhost:5601/api/task_manager/metrics?reset=false and you
should see a framework error underneath the overall metrics and the
alerting metrics.

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2024-08-28 13:22:27 -04:00 committed by GitHub
parent 3aa745f113
commit dafce9016c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 129 additions and 24 deletions

View file

@ -2061,6 +2061,97 @@ describe('TaskManagerRunner', () => {
); );
expect(onTaskEvent).toHaveBeenCalledTimes(2); expect(onTaskEvent).toHaveBeenCalledTimes(2);
}); });
test('emits TaskEvent when failing to update a recurring task', async () => {
const id = _.random(1, 20).toString();
const runAt = minutesFromNow(_.random(5));
const onTaskEvent = jest.fn();
const { runner, instance, store } = await readyToRunStageSetup({
onTaskEvent,
instance: {
id,
schedule: { interval: '1m' },
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
return { runAt, state: {} };
},
}),
},
},
});
const error = new Error('fail');
store.update.mockImplementation(() => {
throw error;
});
await expect(runner.run()).rejects.toThrowError('fail');
expect(onTaskEvent).toHaveBeenCalledWith(
withAnyTiming(
asTaskRunEvent(
id,
asErr({
task: instance,
persistence: TaskPersistence.Recurring,
result: TaskRunResult.Failed,
isExpired: false,
error,
})
)
)
);
});
test('emits TaskEvent when failing to update a non-recurring task', async () => {
const id = _.random(1, 20).toString();
const runAt = minutesFromNow(_.random(5));
const onTaskEvent = jest.fn();
const { runner, instance, store } = await readyToRunStageSetup({
onTaskEvent,
instance: {
id,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
return { runAt, state: {} };
},
}),
},
},
});
const error = new Error('fail');
store.update.mockImplementation(() => {
throw error;
});
await expect(runner.run()).rejects.toThrowError('fail');
expect(onTaskEvent).toHaveBeenCalledWith(
withAnyTiming(
asTaskRunEvent(
id,
asErr({
task: instance,
persistence: TaskPersistence.NonRecurring,
result: TaskRunResult.Failed,
isExpired: false,
error,
})
)
)
);
});
}); });
test('does not update saved object if task expires', async () => { test('does not update saved object if task expires', async () => {

View file

@ -719,40 +719,54 @@ export class TaskManagerRunner implements TaskRunner {
await eitherAsync( await eitherAsync(
result, result,
async ({ runAt, schedule, taskRunError }: SuccessfulRunResult) => { async ({ runAt, schedule, taskRunError }: SuccessfulRunResult) => {
const processedResult = { const taskPersistence =
task, schedule || task.schedule ? TaskPersistence.Recurring : TaskPersistence.NonRecurring;
persistence: try {
schedule || task.schedule ? TaskPersistence.Recurring : TaskPersistence.NonRecurring, const processedResult = {
result: await (runAt || schedule || task.schedule task,
? this.processResultForRecurringTask(result) persistence: taskPersistence,
: this.processResultWhenDone()), result: await (runAt || schedule || task.schedule
}; ? this.processResultForRecurringTask(result)
: this.processResultWhenDone()),
};
// Alerting task runner returns SuccessfulRunResult with taskRunError // Alerting task runner returns SuccessfulRunResult with taskRunError
// when the alerting task fails, so we check for this condition in order // when the alerting task fails, so we check for this condition in order
// to emit the correct task run event for metrics collection // to emit the correct task run event for metrics collection
// taskRunError contains the "source" (TaskErrorSource) data // taskRunError contains the "source" (TaskErrorSource) data
if (!!taskRunError) { if (!!taskRunError) {
debugLogger.debug(`Emitting task run failed event for task ${this.taskType}`); debugLogger.debug(`Emitting task run failed event for task ${this.taskType}`);
this.onTaskEvent(
asTaskRunEvent(
this.id,
asErr({ ...processedResult, isExpired: taskHasExpired, error: taskRunError }),
taskTiming
)
);
} else {
this.onTaskEvent(
asTaskRunEvent(
this.id,
asOk({ ...processedResult, isExpired: taskHasExpired }),
taskTiming
)
);
}
} catch (err) {
this.onTaskEvent( this.onTaskEvent(
asTaskRunEvent( asTaskRunEvent(
this.id, this.id,
asErr({ asErr({
...processedResult, task,
persistence: taskPersistence,
result: TaskRunResult.Failed,
isExpired: taskHasExpired, isExpired: taskHasExpired,
error: taskRunError, error: err,
}), }),
taskTiming taskTiming
) )
); );
} else { throw err;
this.onTaskEvent(
asTaskRunEvent(
this.id,
asOk({ ...processedResult, isExpired: taskHasExpired }),
taskTiming
)
);
} }
}, },
async ({ error }: FailedRunResult) => { async ({ error }: FailedRunResult) => {