mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
[Response Ops][Task Manager] Ensure mget
claim errors are correctly reflected in task claim metrics (#191309)
Resolves https://github.com/elastic/kibana/issues/190082 ## Summary This PR ensures that any errors during the `mget` task claim process are accurately reflected in the task manager metrics. * Removed try/catch statements within the `mget` claim function so any errors updating/getting the task docs get bubbled up to the polling lifecycle code. This ensures that these errors get properly reporting using existing mechanisms * Reporting any errors inside the `mget` task claim process where individual documents may fail to update even if other bulk operations succeed. ## Verify 1. Verify that errors thrown within the `mget` claim process are reflected in the metrics. To do this, you can throw an error in each of the following functions used during the claim cycle: * `taskStore.msearch` * `taskStore.getDocVersions` * `taskStore.bulkUpdate` * `taskStore.bulkGet` 2. Verify that if `taskStore.bulkUpdate` or `taskStore.bulkGet` return successfully but contain errors within the response, they are reflected as task claim failures in the metrics. --------- Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
parent
4bf2f97451
commit
ad816b0bf3
5 changed files with 217 additions and 117 deletions
|
@ -24,6 +24,7 @@ import { TaskCost } from './task';
|
|||
import { CLAIM_STRATEGY_MGET, DEFAULT_KIBANAS_PER_PARTITION } from './config';
|
||||
import { TaskPartitioner } from './lib/task_partitioner';
|
||||
import { KibanaDiscoveryService } from './kibana_discovery_service';
|
||||
import { TaskEventType } from './task_events';
|
||||
|
||||
const executionContext = executionContextServiceMock.createSetupContract();
|
||||
let mockTaskClaiming = taskClaimingMock.create({});
|
||||
|
@ -435,6 +436,130 @@ describe('TaskPollingLifecycle', () => {
|
|||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('pollingLifecycleEvents events', () => {
|
||||
test('should emit success event when polling is successful', async () => {
|
||||
clock.restore();
|
||||
mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockImplementation(() =>
|
||||
of(
|
||||
asOk({
|
||||
docs: [],
|
||||
stats: { tasksUpdated: 0, tasksConflicted: 0, tasksClaimed: 0 },
|
||||
})
|
||||
)
|
||||
);
|
||||
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
|
||||
const taskPollingLifecycle = new TaskPollingLifecycle({
|
||||
...taskManagerOpts,
|
||||
elasticsearchAndSOAvailability$,
|
||||
});
|
||||
|
||||
const emittedEvents: TaskLifecycleEvent[] = [];
|
||||
|
||||
taskPollingLifecycle.events.subscribe((event: TaskLifecycleEvent) =>
|
||||
emittedEvents.push(event)
|
||||
);
|
||||
|
||||
elasticsearchAndSOAvailability$.next(true);
|
||||
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
|
||||
await retryUntil('pollingCycleEvent emitted', () => {
|
||||
return !!emittedEvents.find(
|
||||
(event: TaskLifecycleEvent) => event.type === TaskEventType.TASK_POLLING_CYCLE
|
||||
);
|
||||
});
|
||||
|
||||
const pollingCycleEvent = emittedEvents.find(
|
||||
(event: TaskLifecycleEvent) => event.type === TaskEventType.TASK_POLLING_CYCLE
|
||||
);
|
||||
|
||||
expect(pollingCycleEvent!.event).toEqual({
|
||||
tag: 'ok',
|
||||
value: {
|
||||
result: 'NoTasksClaimed',
|
||||
stats: {
|
||||
tasksUpdated: 0,
|
||||
tasksConflicted: 0,
|
||||
tasksClaimed: 0,
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
test('should emit failure event when polling error occurs', async () => {
|
||||
clock.restore();
|
||||
mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockImplementation(() => {
|
||||
throw new Error('booo');
|
||||
});
|
||||
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
|
||||
const taskPollingLifecycle = new TaskPollingLifecycle({
|
||||
...taskManagerOpts,
|
||||
elasticsearchAndSOAvailability$,
|
||||
});
|
||||
|
||||
const emittedEvents: TaskLifecycleEvent[] = [];
|
||||
|
||||
taskPollingLifecycle.events.subscribe((event: TaskLifecycleEvent) =>
|
||||
emittedEvents.push(event)
|
||||
);
|
||||
|
||||
elasticsearchAndSOAvailability$.next(true);
|
||||
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
|
||||
await retryUntil('pollingCycleEvent emitted', () => {
|
||||
return !!emittedEvents.find(
|
||||
(event: TaskLifecycleEvent) => event.type === TaskEventType.TASK_POLLING_CYCLE
|
||||
);
|
||||
});
|
||||
|
||||
const pollingCycleEvent = emittedEvents.find(
|
||||
(event: TaskLifecycleEvent) => event.type === TaskEventType.TASK_POLLING_CYCLE
|
||||
);
|
||||
|
||||
expect(pollingCycleEvent!.event).toEqual({
|
||||
tag: 'err',
|
||||
error: new Error(`Failed to poll for work: Error: booo`),
|
||||
});
|
||||
});
|
||||
|
||||
test('should emit failure event when polling is successful but individual task errors reported', async () => {
|
||||
clock.restore();
|
||||
mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockImplementation(() =>
|
||||
of(
|
||||
asOk({
|
||||
docs: [],
|
||||
stats: { tasksUpdated: 0, tasksConflicted: 0, tasksClaimed: 0, tasksErrors: 2 },
|
||||
})
|
||||
)
|
||||
);
|
||||
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
|
||||
const taskPollingLifecycle = new TaskPollingLifecycle({
|
||||
...taskManagerOpts,
|
||||
elasticsearchAndSOAvailability$,
|
||||
});
|
||||
|
||||
const emittedEvents: TaskLifecycleEvent[] = [];
|
||||
|
||||
taskPollingLifecycle.events.subscribe((event: TaskLifecycleEvent) =>
|
||||
emittedEvents.push(event)
|
||||
);
|
||||
|
||||
elasticsearchAndSOAvailability$.next(true);
|
||||
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
|
||||
await retryUntil('pollingCycleEvent emitted', () => {
|
||||
return !!emittedEvents.find(
|
||||
(event: TaskLifecycleEvent) => event.type === TaskEventType.TASK_POLLING_CYCLE
|
||||
);
|
||||
});
|
||||
|
||||
const pollingCycleEvent = emittedEvents.find(
|
||||
(event: TaskLifecycleEvent) => event.type === TaskEventType.TASK_POLLING_CYCLE
|
||||
);
|
||||
|
||||
expect(pollingCycleEvent!.event).toEqual({
|
||||
tag: 'err',
|
||||
error: new Error(`Partially failed to poll for work: some tasks could not be claimed.`),
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
function getFirstAsPromise<T>(obs$: Observable<T>): Promise<T> {
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
|
||||
import { Subject, Observable } from 'rxjs';
|
||||
import { pipe } from 'fp-ts/lib/pipeable';
|
||||
import { map as mapOptional } from 'fp-ts/lib/Option';
|
||||
import { map as mapOptional, none } from 'fp-ts/lib/Option';
|
||||
import { tap } from 'rxjs';
|
||||
import { UsageCounter } from '@kbn/usage-collection-plugin/server';
|
||||
import type { Logger, ExecutionContextStart } from '@kbn/core/server';
|
||||
|
@ -312,7 +312,21 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
|
|||
this.emitEvent(
|
||||
map(
|
||||
result,
|
||||
({ timing, ...event }) => asTaskPollingCycleEvent<string>(asOk(event), timing),
|
||||
({ timing, ...event }) => {
|
||||
const anyTaskErrors = event.stats?.tasksErrors ?? 0;
|
||||
if (anyTaskErrors > 0) {
|
||||
return asTaskPollingCycleEvent<string>(
|
||||
asErr(
|
||||
new PollingError<string>(
|
||||
'Partially failed to poll for work: some tasks could not be claimed.',
|
||||
PollingErrorType.WorkError,
|
||||
none
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
return asTaskPollingCycleEvent<string>(asOk(event), timing);
|
||||
},
|
||||
(event) => asTaskPollingCycleEvent<string>(asErr(event))
|
||||
)
|
||||
);
|
||||
|
|
|
@ -39,6 +39,7 @@ export interface ClaimOwnershipResult {
|
|||
tasksConflicted: number;
|
||||
tasksClaimed: number;
|
||||
tasksLeftUnclaimed?: number;
|
||||
tasksErrors?: number;
|
||||
};
|
||||
docs: ConcreteTaskInstance[];
|
||||
timing?: TaskTiming;
|
||||
|
|
|
@ -377,7 +377,7 @@ describe('TaskClaiming', () => {
|
|||
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
|
||||
|
||||
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
|
||||
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 3; updateErrors: 0; removed: 0;',
|
||||
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 3; updateErrors: 0; getErrors: 0; removed: 0;',
|
||||
{ tags: ['claimAvailableTasksMget'] }
|
||||
);
|
||||
|
||||
|
@ -422,6 +422,7 @@ describe('TaskClaiming', () => {
|
|||
expect(result.stats).toEqual({
|
||||
tasksClaimed: 3,
|
||||
tasksConflicted: 0,
|
||||
tasksErrors: 0,
|
||||
tasksUpdated: 3,
|
||||
tasksLeftUnclaimed: 3,
|
||||
});
|
||||
|
@ -475,7 +476,7 @@ describe('TaskClaiming', () => {
|
|||
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
|
||||
|
||||
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
|
||||
'task claimer claimed: 1; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; removed: 2;',
|
||||
'task claimer claimed: 1; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 2;',
|
||||
{ tags: ['claimAvailableTasksMget'] }
|
||||
);
|
||||
|
||||
|
@ -516,6 +517,7 @@ describe('TaskClaiming', () => {
|
|||
expect(result.stats).toEqual({
|
||||
tasksClaimed: 1,
|
||||
tasksConflicted: 0,
|
||||
tasksErrors: 0,
|
||||
tasksUpdated: 1,
|
||||
tasksLeftUnclaimed: 0,
|
||||
});
|
||||
|
@ -581,7 +583,7 @@ describe('TaskClaiming', () => {
|
|||
{ tags: ['claimAvailableTasksMget'] }
|
||||
);
|
||||
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
|
||||
'task claimer claimed: 1; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; removed: 1;',
|
||||
'task claimer claimed: 1; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 1;',
|
||||
{ tags: ['claimAvailableTasksMget'] }
|
||||
);
|
||||
|
||||
|
@ -622,6 +624,7 @@ describe('TaskClaiming', () => {
|
|||
expect(result.stats).toEqual({
|
||||
tasksClaimed: 1,
|
||||
tasksConflicted: 0,
|
||||
tasksErrors: 0,
|
||||
tasksUpdated: 1,
|
||||
tasksLeftUnclaimed: 0,
|
||||
});
|
||||
|
@ -679,7 +682,7 @@ describe('TaskClaiming', () => {
|
|||
{ tags: ['claimAvailableTasksMget'] }
|
||||
);
|
||||
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
|
||||
'task claimer claimed: 1; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; removed: 0;',
|
||||
'task claimer claimed: 1; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
|
||||
{ tags: ['claimAvailableTasksMget'] }
|
||||
);
|
||||
|
||||
|
@ -720,6 +723,7 @@ describe('TaskClaiming', () => {
|
|||
expect(result.stats).toEqual({
|
||||
tasksClaimed: 1,
|
||||
tasksConflicted: 0,
|
||||
tasksErrors: 0,
|
||||
tasksUpdated: 1,
|
||||
tasksLeftUnclaimed: 0,
|
||||
});
|
||||
|
@ -828,7 +832,7 @@ describe('TaskClaiming', () => {
|
|||
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
|
||||
|
||||
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
|
||||
'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; removed: 0;',
|
||||
'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
|
||||
{ tags: ['claimAvailableTasksMget'] }
|
||||
);
|
||||
|
||||
|
@ -860,6 +864,7 @@ describe('TaskClaiming', () => {
|
|||
expect(result.stats).toEqual({
|
||||
tasksClaimed: 2,
|
||||
tasksConflicted: 0,
|
||||
tasksErrors: 0,
|
||||
tasksUpdated: 2,
|
||||
tasksLeftUnclaimed: 0,
|
||||
});
|
||||
|
@ -913,7 +918,7 @@ describe('TaskClaiming', () => {
|
|||
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
|
||||
|
||||
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
|
||||
'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; removed: 0;',
|
||||
'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
|
||||
{ tags: ['claimAvailableTasksMget'] }
|
||||
);
|
||||
|
||||
|
@ -945,6 +950,7 @@ describe('TaskClaiming', () => {
|
|||
expect(result.stats).toEqual({
|
||||
tasksClaimed: 2,
|
||||
tasksConflicted: 0,
|
||||
tasksErrors: 0,
|
||||
tasksUpdated: 2,
|
||||
tasksLeftUnclaimed: 0,
|
||||
});
|
||||
|
@ -998,7 +1004,7 @@ describe('TaskClaiming', () => {
|
|||
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
|
||||
|
||||
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
|
||||
'task claimer claimed: 2; stale: 1; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; removed: 0;',
|
||||
'task claimer claimed: 2; stale: 1; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
|
||||
{ tags: ['claimAvailableTasksMget'] }
|
||||
);
|
||||
|
||||
|
@ -1030,6 +1036,7 @@ describe('TaskClaiming', () => {
|
|||
expect(result.stats).toEqual({
|
||||
tasksClaimed: 2,
|
||||
tasksConflicted: 1,
|
||||
tasksErrors: 0,
|
||||
tasksUpdated: 2,
|
||||
tasksLeftUnclaimed: 0,
|
||||
});
|
||||
|
@ -1089,7 +1096,7 @@ describe('TaskClaiming', () => {
|
|||
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
|
||||
|
||||
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
|
||||
'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; removed: 0;',
|
||||
'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
|
||||
{ tags: ['claimAvailableTasksMget'] }
|
||||
);
|
||||
|
||||
|
@ -1140,6 +1147,7 @@ describe('TaskClaiming', () => {
|
|||
expect(result.stats).toEqual({
|
||||
tasksClaimed: 4,
|
||||
tasksConflicted: 0,
|
||||
tasksErrors: 0,
|
||||
tasksUpdated: 4,
|
||||
tasksLeftUnclaimed: 0,
|
||||
});
|
||||
|
@ -1204,10 +1212,10 @@ describe('TaskClaiming', () => {
|
|||
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
|
||||
|
||||
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
|
||||
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; removed: 0;',
|
||||
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 1; removed: 0;',
|
||||
{ tags: ['claimAvailableTasksMget'] }
|
||||
);
|
||||
expect(taskManagerLogger.warn).toHaveBeenCalledWith(
|
||||
expect(taskManagerLogger.error).toHaveBeenCalledWith(
|
||||
'Error getting full task id-2:task during claim: Oh no',
|
||||
{ tags: ['claimAvailableTasksMget'] }
|
||||
);
|
||||
|
@ -1257,13 +1265,14 @@ describe('TaskClaiming', () => {
|
|||
expect(result.stats).toEqual({
|
||||
tasksClaimed: 3,
|
||||
tasksConflicted: 0,
|
||||
tasksErrors: 1,
|
||||
tasksUpdated: 3,
|
||||
tasksLeftUnclaimed: 0,
|
||||
});
|
||||
expect(result.docs.length).toEqual(3);
|
||||
});
|
||||
|
||||
test('should handle error when bulk getting all full task docs', async () => {
|
||||
test('should throw when error when bulk getting all full task docs', async () => {
|
||||
const store = taskStoreMock.create({ taskManagerId: 'test-test' });
|
||||
store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`));
|
||||
|
||||
|
@ -1294,30 +1303,17 @@ describe('TaskClaiming', () => {
|
|||
taskPartitioner,
|
||||
});
|
||||
|
||||
const [resultOrErr] = await getAllAsPromise(
|
||||
taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() })
|
||||
);
|
||||
|
||||
if (!isOk<ClaimOwnershipResult, FillPoolResult>(resultOrErr)) {
|
||||
expect(resultOrErr).toBe(undefined);
|
||||
}
|
||||
|
||||
const result = unwrap(resultOrErr) as ClaimOwnershipResult;
|
||||
await expect(() =>
|
||||
getAllAsPromise(
|
||||
taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() })
|
||||
)
|
||||
).rejects.toThrowErrorMatchingInlineSnapshot(`"oh no"`);
|
||||
|
||||
expect(apm.startTransaction).toHaveBeenCalledWith(
|
||||
TASK_MANAGER_MARK_AS_CLAIMED,
|
||||
TASK_MANAGER_TRANSACTION_TYPE
|
||||
);
|
||||
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
|
||||
|
||||
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
|
||||
'task claimer claimed: 0; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; removed: 0;',
|
||||
{ tags: ['claimAvailableTasksMget'] }
|
||||
);
|
||||
expect(taskManagerLogger.warn).toHaveBeenCalledWith(
|
||||
'Error getting full task documents during claim: Error: oh no',
|
||||
{ tags: ['claimAvailableTasksMget'] }
|
||||
);
|
||||
expect(mockApmTrans.end).toHaveBeenCalledWith('failure');
|
||||
|
||||
expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
|
||||
size: 40,
|
||||
|
@ -1360,14 +1356,6 @@ describe('TaskClaiming', () => {
|
|||
{ validate: false, excludeLargeFields: true }
|
||||
);
|
||||
expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-2', 'id-3', 'id-4']);
|
||||
|
||||
expect(result.stats).toEqual({
|
||||
tasksClaimed: 0,
|
||||
tasksConflicted: 0,
|
||||
tasksUpdated: 0,
|
||||
tasksLeftUnclaimed: 0,
|
||||
});
|
||||
expect(result.docs.length).toEqual(0);
|
||||
});
|
||||
|
||||
test('should handle individual errors when bulk updating the task doc', async () => {
|
||||
|
@ -1430,10 +1418,10 @@ describe('TaskClaiming', () => {
|
|||
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
|
||||
|
||||
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
|
||||
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 1; removed: 0;',
|
||||
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 1; getErrors: 0; removed: 0;',
|
||||
{ tags: ['claimAvailableTasksMget'] }
|
||||
);
|
||||
expect(taskManagerLogger.warn).toHaveBeenCalledWith(
|
||||
expect(taskManagerLogger.error).toHaveBeenCalledWith(
|
||||
'Error updating task id-2:task during claim: Oh no',
|
||||
{ tags: ['claimAvailableTasksMget'] }
|
||||
);
|
||||
|
@ -1483,13 +1471,14 @@ describe('TaskClaiming', () => {
|
|||
expect(result.stats).toEqual({
|
||||
tasksClaimed: 3,
|
||||
tasksConflicted: 0,
|
||||
tasksErrors: 1,
|
||||
tasksUpdated: 3,
|
||||
tasksLeftUnclaimed: 0,
|
||||
});
|
||||
expect(result.docs.length).toEqual(3);
|
||||
});
|
||||
|
||||
test('should handle error when bulk updating all task docs', async () => {
|
||||
test('should throw error when error bulk updating all task docs', async () => {
|
||||
const store = taskStoreMock.create({ taskManagerId: 'test-test' });
|
||||
store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`));
|
||||
|
||||
|
@ -1518,30 +1507,17 @@ describe('TaskClaiming', () => {
|
|||
taskPartitioner,
|
||||
});
|
||||
|
||||
const [resultOrErr] = await getAllAsPromise(
|
||||
taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() })
|
||||
);
|
||||
|
||||
if (!isOk<ClaimOwnershipResult, FillPoolResult>(resultOrErr)) {
|
||||
expect(resultOrErr).toBe(undefined);
|
||||
}
|
||||
|
||||
const result = unwrap(resultOrErr) as ClaimOwnershipResult;
|
||||
await expect(() =>
|
||||
getAllAsPromise(
|
||||
taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() })
|
||||
)
|
||||
).rejects.toThrowErrorMatchingInlineSnapshot(`"oh no"`);
|
||||
|
||||
expect(apm.startTransaction).toHaveBeenCalledWith(
|
||||
TASK_MANAGER_MARK_AS_CLAIMED,
|
||||
TASK_MANAGER_TRANSACTION_TYPE
|
||||
);
|
||||
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
|
||||
|
||||
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
|
||||
'task claimer claimed: 0; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; removed: 0;',
|
||||
{ tags: ['claimAvailableTasksMget'] }
|
||||
);
|
||||
expect(taskManagerLogger.warn).toHaveBeenCalledWith(
|
||||
'Error updating tasks during claim: Error: oh no',
|
||||
{ tags: ['claimAvailableTasksMget'] }
|
||||
);
|
||||
expect(mockApmTrans.end).toHaveBeenCalledWith('failure');
|
||||
|
||||
expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
|
||||
size: 40,
|
||||
|
@ -1583,15 +1559,7 @@ describe('TaskClaiming', () => {
|
|||
],
|
||||
{ validate: false, excludeLargeFields: true }
|
||||
);
|
||||
expect(store.bulkGet).toHaveBeenCalledWith([]);
|
||||
|
||||
expect(result.stats).toEqual({
|
||||
tasksClaimed: 0,
|
||||
tasksConflicted: 0,
|
||||
tasksUpdated: 0,
|
||||
tasksLeftUnclaimed: 0,
|
||||
});
|
||||
expect(result.docs.length).toEqual(0);
|
||||
expect(store.bulkGet).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test('it should filter for specific partitions and tasks without partitions', async () => {
|
||||
|
|
|
@ -202,60 +202,50 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
|
|||
// perform the task object updates, deal with errors
|
||||
const updatedTasks: ConcreteTaskInstance[] = [];
|
||||
let conflicts = staleTasks.length;
|
||||
let bulkErrors = 0;
|
||||
let bulkUpdateErrors = 0;
|
||||
let bulkGetErrors = 0;
|
||||
|
||||
try {
|
||||
const updateResults = await taskStore.bulkUpdate(taskUpdates, {
|
||||
validate: false,
|
||||
excludeLargeFields: true,
|
||||
});
|
||||
for (const updateResult of updateResults) {
|
||||
if (isOk(updateResult)) {
|
||||
updatedTasks.push(updateResult.value);
|
||||
} else {
|
||||
const { id, type, error } = updateResult.error;
|
||||
const updateResults = await taskStore.bulkUpdate(taskUpdates, {
|
||||
validate: false,
|
||||
excludeLargeFields: true,
|
||||
});
|
||||
for (const updateResult of updateResults) {
|
||||
if (isOk(updateResult)) {
|
||||
updatedTasks.push(updateResult.value);
|
||||
} else {
|
||||
const { id, type, error } = updateResult.error;
|
||||
|
||||
// this check is needed so error will be typed correctly for isConflictError
|
||||
if (SavedObjectsErrorHelpers.isSavedObjectsClientError(error)) {
|
||||
if (SavedObjectsErrorHelpers.isConflictError(error)) {
|
||||
conflicts++;
|
||||
} else {
|
||||
logger.warn(
|
||||
`Saved Object error updating task ${id}:${type} during claim: ${error.error}`,
|
||||
logMeta
|
||||
);
|
||||
bulkErrors++;
|
||||
}
|
||||
// this check is needed so error will be typed correctly for isConflictError
|
||||
if (SavedObjectsErrorHelpers.isSavedObjectsClientError(error)) {
|
||||
if (SavedObjectsErrorHelpers.isConflictError(error)) {
|
||||
conflicts++;
|
||||
} else {
|
||||
logger.warn(`Error updating task ${id}:${type} during claim: ${error.message}`, logMeta);
|
||||
bulkErrors++;
|
||||
logger.error(
|
||||
`Saved Object error updating task ${id}:${type} during claim: ${error.error}`,
|
||||
logMeta
|
||||
);
|
||||
bulkUpdateErrors++;
|
||||
}
|
||||
} else {
|
||||
logger.error(`Error updating task ${id}:${type} during claim: ${error.message}`, logMeta);
|
||||
bulkUpdateErrors++;
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warn(`Error updating tasks during claim: ${err}`, logMeta);
|
||||
}
|
||||
|
||||
// perform an mget to get the full task instance for claiming
|
||||
let fullTasksToRun: ConcreteTaskInstance[] = [];
|
||||
try {
|
||||
fullTasksToRun = (await taskStore.bulkGet(updatedTasks.map((task) => task.id))).reduce<
|
||||
ConcreteTaskInstance[]
|
||||
>((acc, task) => {
|
||||
if (isOk(task)) {
|
||||
acc.push(task.value);
|
||||
} else {
|
||||
const { id, type, error } = task.error;
|
||||
logger.warn(
|
||||
`Error getting full task ${id}:${type} during claim: ${error.message}`,
|
||||
logMeta
|
||||
);
|
||||
}
|
||||
return acc;
|
||||
}, []);
|
||||
} catch (err) {
|
||||
logger.warn(`Error getting full task documents during claim: ${err}`, logMeta);
|
||||
}
|
||||
const fullTasksToRun = (await taskStore.bulkGet(updatedTasks.map((task) => task.id))).reduce<
|
||||
ConcreteTaskInstance[]
|
||||
>((acc, task) => {
|
||||
if (isOk(task)) {
|
||||
acc.push(task.value);
|
||||
} else {
|
||||
const { id, type, error } = task.error;
|
||||
logger.error(`Error getting full task ${id}:${type} during claim: ${error.message}`, logMeta);
|
||||
bulkGetErrors++;
|
||||
}
|
||||
return acc;
|
||||
}, []);
|
||||
|
||||
// separate update for removed tasks; shouldn't happen often, so unlikely
|
||||
// a performance concern, and keeps the rest of the logic simpler
|
||||
|
@ -284,12 +274,13 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
|
|||
}
|
||||
}
|
||||
} catch (err) {
|
||||
// swallow the error because this is unrelated to the claim cycle
|
||||
logger.warn(`Error updating tasks to mark as unrecognized during claim: ${err}`, logMeta);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: need a better way to generate stats
|
||||
const message = `task claimer claimed: ${fullTasksToRun.length}; stale: ${staleTasks.length}; conflicts: ${conflicts}; missing: ${missingTasks.length}; capacity reached: ${leftOverTasks.length}; updateErrors: ${bulkErrors}; removed: ${removedCount};`;
|
||||
const message = `task claimer claimed: ${fullTasksToRun.length}; stale: ${staleTasks.length}; conflicts: ${conflicts}; missing: ${missingTasks.length}; capacity reached: ${leftOverTasks.length}; updateErrors: ${bulkUpdateErrors}; getErrors: ${bulkGetErrors}; removed: ${removedCount};`;
|
||||
logger.debug(message, logMeta);
|
||||
|
||||
// build results
|
||||
|
@ -299,6 +290,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
|
|||
tasksConflicted: conflicts,
|
||||
tasksClaimed: fullTasksToRun.length,
|
||||
tasksLeftUnclaimed: leftOverTasks.length,
|
||||
tasksErrors: bulkUpdateErrors + bulkGetErrors,
|
||||
},
|
||||
docs: fullTasksToRun,
|
||||
timing: stopTaskTimer(),
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue