[8.x] [Response Ops][Task Manager] Handle errors in `getCapacity` function during task polling (#194759) (#194823)

# Backport

This will backport the following commits from `main` to `8.x`:
- [[Response Ops][Task Manager] Handle errors in `getCapacity`
function during task polling
(#194759)](https://github.com/elastic/kibana/pull/194759)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Ying
Mao","email":"ying.mao@elastic.co"},"sourceCommit":{"committedDate":"2024-10-03T11:55:18Z","message":"[Response
Ops][Task Manager] Handle errors in `getCapacity` function during task
polling (#194759)\n\n## Summary\r\n\r\n* Moves the `getCapacity` call
during task polling within the try/catch\r\nso any errors with this
function will be caught and logged under the\r\n`Failed to poll for
work` message and polling continues\r\n* During `mget` claim strategy,
perform a final check after the\r\n`bulkGet` to check for tasks with a
null `startedAt` value. If any tasks\r\nmeet this condition, log some
basic info and manually assign the\r\n`startedAt`. This is a stop-gap
measure to ensure we understand why we\r\nmight be seeing tasks with
null `startedAt` values. In the future we may\r\nchoose to filter out
these tasks from running in this
cycle.","sha":"6827ba4dc58a262c6c2b332b7c155cc037c02475","branchLabelMapping":{"^v9.0.0$":"main","^v8.16.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Feature:Task
Manager","Team:ResponseOps","v9.0.0","backport:prev-major","v8.16.0"],"title":"[Response
Ops][Task Manager] Handle errors in `getCapacity` function during task
polling","number":194759,"url":"https://github.com/elastic/kibana/pull/194759","mergeCommit":{"message":"[Response
Ops][Task Manager] Handle errors in `getCapacity` function during task
polling (#194759)\n\n## Summary\r\n\r\n* Moves the `getCapacity` call
during task polling within the try/catch\r\nso any errors with this
function will be caught and logged under the\r\n`Failed to poll for
work` message and polling continues\r\n* During `mget` claim strategy,
perform a final check after the\r\n`bulkGet` to check for tasks with a
null `startedAt` value. If any tasks\r\nmeet this condition, log some
basic info and manually assign the\r\n`startedAt`. This is a stop-gap
measure to ensure we understand why we\r\nmight be seeing tasks with
null `startedAt` values. In the future we may\r\nchoose to filter out
these tasks from running in this
cycle.","sha":"6827ba4dc58a262c6c2b332b7c155cc037c02475"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/194759","number":194759,"mergeCommit":{"message":"[Response
Ops][Task Manager] Handle errors in `getCapacity` function during task
polling (#194759)\n\n## Summary\r\n\r\n* Moves the `getCapacity` call
during task polling within the try/catch\r\nso any errors with this
function will be caught and logged under the\r\n`Failed to poll for
work` message and polling continues\r\n* During `mget` claim strategy,
perform a final check after the\r\n`bulkGet` to check for tasks with a
null `startedAt` value. If any tasks\r\nmeet this condition, log some
basic info and manually assign the\r\n`startedAt`. This is a stop-gap
measure to ensure we understand why we\r\nmight be seeing tasks with
null `startedAt` values. In the future we may\r\nchoose to filter out
these tasks from running in this
cycle.","sha":"6827ba4dc58a262c6c2b332b7c155cc037c02475"}},{"branch":"8.x","label":"v8.16.0","branchLabelMappingKey":"^v8.16.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Ying Mao <ying.mao@elastic.co>
This commit is contained in:
Kibana Machine 2024-10-04 00:27:04 +10:00 committed by GitHub
parent 023e3ed826
commit 97760d5b59
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 193 additions and 5 deletions

View file

@ -310,6 +310,52 @@ describe('TaskPoller', () => {
expect(handler).toHaveBeenCalledWith(asOk(3));
});
test('continues polling if getCapacity throws error fails', async () => {
const pollInterval = 100;
const handler = jest.fn();
let callCount = 0;
const work = jest.fn(async () => callCount);
const poller = createTaskPoller<string, number>({
initialPollInterval: pollInterval,
logger: loggingSystemMock.create().get(),
pollInterval$: of(pollInterval),
pollIntervalDelay$: of(0),
work,
getCapacity: () => {
callCount++;
if (callCount === 2) {
throw new Error('error getting capacity');
}
return 2;
},
});
poller.events$.subscribe(handler);
poller.start();
clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));
expect(handler).toHaveBeenCalledWith(asOk(1));
clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));
const expectedError = new PollingError<string>(
'Failed to poll for work: Error: error getting capacity',
PollingErrorType.WorkError,
none
);
expect(handler).toHaveBeenCalledWith(asErr(expectedError));
expect(handler.mock.calls[1][0].error.type).toEqual(PollingErrorType.WorkError);
expect(handler).not.toHaveBeenCalledWith(asOk(2));
clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));
expect(handler).toHaveBeenCalledWith(asOk(3));
});
test(`doesn't start polling until start is called`, async () => {
const pollInterval = 100;

View file

@ -61,14 +61,15 @@ export function createTaskPoller<T, H>({
async function runCycle() {
timeoutId = null;
const start = Date.now();
if (hasCapacity()) {
try {
try {
if (hasCapacity()) {
const result = await work();
subject.next(asOk(result));
} catch (e) {
subject.next(asPollingError<T>(e, PollingErrorType.WorkError));
}
} catch (e) {
subject.next(asPollingError<T>(e, PollingErrorType.WorkError));
}
if (running) {
// Set the next runCycle call
timeoutId = setTimeout(

View file

@ -1343,6 +1343,137 @@ describe('TaskClaiming', () => {
expect(result.docs.length).toEqual(3);
});
test('should assign startedAt value if bulkGet returns task with null startedAt', async () => {
const store = taskStoreMock.create({ taskManagerId: 'test-test' });
store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`));
const fetchedTasks = [
mockInstance({ id: `id-1`, taskType: 'report' }),
mockInstance({ id: `id-2`, taskType: 'report' }),
mockInstance({ id: `id-3`, taskType: 'yawn' }),
mockInstance({ id: `id-4`, taskType: 'report' }),
];
const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks);
store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap });
store.getDocVersions.mockResolvedValueOnce(docLatestVersions);
store.bulkPartialUpdate.mockResolvedValueOnce(
[fetchedTasks[0], fetchedTasks[1], fetchedTasks[2], fetchedTasks[3]].map(
getPartialUpdateResult
)
);
store.bulkGet.mockResolvedValueOnce([
asOk({ ...fetchedTasks[0], startedAt: new Date() }),
asOk(fetchedTasks[1]),
asOk({ ...fetchedTasks[2], startedAt: new Date() }),
asOk({ ...fetchedTasks[3], startedAt: new Date() }),
]);
const taskClaiming = new TaskClaiming({
logger: taskManagerLogger,
strategy: CLAIM_STRATEGY_MGET,
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
unusedTypes: [],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
});
const [resultOrErr] = await getAllAsPromise(
taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() })
);
if (!isOk<ClaimOwnershipResult, FillPoolResult>(resultOrErr)) {
expect(resultOrErr).toBe(undefined);
}
const result = unwrap(resultOrErr) as ClaimOwnershipResult;
expect(apm.startTransaction).toHaveBeenCalledWith(
TASK_MANAGER_MARK_AS_CLAIMED,
TASK_MANAGER_TRANSACTION_TYPE
);
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(taskManagerLogger.warn).toHaveBeenCalledWith(
'Task id-2 has a null startedAt value, setting to current time - ownerId null, status idle',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
size: 40,
seq_no_primary_term: true,
});
expect(store.getDocVersions).toHaveBeenCalledWith([
'task:id-1',
'task:id-2',
'task:id-3',
'task:id-4',
]);
expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(1);
expect(store.bulkPartialUpdate).toHaveBeenCalledWith([
{
id: fetchedTasks[0].id,
version: fetchedTasks[0].version,
scheduledAt: fetchedTasks[0].runAt,
attempts: 1,
ownerId: 'test-test',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
id: fetchedTasks[1].id,
version: fetchedTasks[1].version,
scheduledAt: fetchedTasks[1].runAt,
attempts: 1,
ownerId: 'test-test',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
id: fetchedTasks[2].id,
version: fetchedTasks[2].version,
scheduledAt: fetchedTasks[2].runAt,
attempts: 1,
ownerId: 'test-test',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
id: fetchedTasks[3].id,
version: fetchedTasks[3].version,
scheduledAt: fetchedTasks[3].runAt,
attempts: 1,
ownerId: 'test-test',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
]);
expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-2', 'id-3', 'id-4']);
expect(result.stats).toEqual({
tasksClaimed: 4,
tasksConflicted: 0,
tasksErrors: 0,
tasksUpdated: 4,
tasksLeftUnclaimed: 0,
});
expect(result.docs.length).toEqual(4);
for (const r of result.docs) {
expect(r.startedAt).not.toBeNull();
}
});
test('should throw when error when bulk getting all full task docs', async () => {
const store = taskStoreMock.create({ taskManagerId: 'test-test' });
store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`));
@ -2295,7 +2426,7 @@ describe('TaskClaiming', () => {
user: 'dabo',
scope: ['reporting', 'ceo'],
ownerId: taskManagerId,
startedAt: null,
startedAt: new Date(),
retryAt: null,
scheduledAt: new Date(),
traceparent: 'newParent',

View file

@ -247,6 +247,16 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
[]
);
// Look for tasks that have a null startedAt value, log them and manually set a startedAt field
for (const task of fullTasksToRun) {
if (task.startedAt == null) {
logger.warn(
`Task ${task.id} has a null startedAt value, setting to current time - ownerId ${task.ownerId}, status ${task.status}`
);
task.startedAt = now;
}
}
// separate update for removed tasks; shouldn't happen often, so unlikely
// a performance concern, and keeps the rest of the logic simpler
let removedCount = 0;