mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 09:19:04 -04:00
Improve Task Manager instrumentation (#99160)
* Instrument task manager
* Don't refresh after SO updates
* Update snapshot test, remove beforeRun instrumentation
* Revert "Don't refresh after SO updates"
This reverts commit 54f848dc0c
.
* Fix task_store unit test
* Adding tests and updating ConcreteTaskInstance interface with traceparent
* Reverting unnecessary changes
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Ying Mao <ying.mao@elastic.co>
This commit is contained in:
parent
a5949bf3da
commit
c79a29a08e
11 changed files with 407 additions and 148 deletions
|
@ -16,6 +16,8 @@ export interface SpanOptions {
|
|||
labels?: Record<string, string>;
|
||||
}
|
||||
|
||||
type Span = Exclude<typeof agent.currentSpan, undefined | null>;
|
||||
|
||||
export function parseSpanOptions(optionsOrName: SpanOptions | string) {
|
||||
const options = typeof optionsOrName === 'string' ? { name: optionsOrName } : optionsOrName;
|
||||
|
||||
|
@ -30,7 +32,7 @@ const runInNewContext = <T extends (...args: any[]) => any>(cb: T): ReturnType<T
|
|||
|
||||
export async function withSpan<T>(
|
||||
optionsOrName: SpanOptions | string,
|
||||
cb: () => Promise<T>
|
||||
cb: (span?: Span) => Promise<T>
|
||||
): Promise<T> {
|
||||
const options = parseSpanOptions(optionsOrName);
|
||||
|
||||
|
@ -71,13 +73,17 @@ export async function withSpan<T>(
|
|||
span.addLabels(labels);
|
||||
}
|
||||
|
||||
return cb()
|
||||
return cb(span)
|
||||
.then((res) => {
|
||||
span.outcome = 'success';
|
||||
if (!span.outcome || span.outcome === 'unknown') {
|
||||
span.outcome = 'success';
|
||||
}
|
||||
return res;
|
||||
})
|
||||
.catch((err) => {
|
||||
span.outcome = 'failure';
|
||||
if (!span.outcome || span.outcome === 'unknown') {
|
||||
span.outcome = 'failure';
|
||||
}
|
||||
throw err;
|
||||
})
|
||||
.finally(() => {
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
import type { PublicMethodsOf } from '@kbn/utility-types';
|
||||
import { Logger, KibanaRequest } from 'src/core/server';
|
||||
import { withSpan } from '@kbn/apm-utils';
|
||||
import { validateParams, validateConfig, validateSecrets } from './validate_with_schema';
|
||||
import {
|
||||
ActionTypeExecutorResult,
|
||||
|
@ -78,113 +79,135 @@ export class ActionExecutor {
|
|||
);
|
||||
}
|
||||
|
||||
const {
|
||||
logger,
|
||||
spaces,
|
||||
getServices,
|
||||
encryptedSavedObjectsClient,
|
||||
actionTypeRegistry,
|
||||
eventLogger,
|
||||
preconfiguredActions,
|
||||
getActionsClientWithRequest,
|
||||
} = this.actionExecutorContext!;
|
||||
|
||||
const services = getServices(request);
|
||||
const spaceId = spaces && spaces.getSpaceId(request);
|
||||
const namespace = spaceId && spaceId !== 'default' ? { namespace: spaceId } : {};
|
||||
|
||||
const { actionTypeId, name, config, secrets } = await getActionInfo(
|
||||
await getActionsClientWithRequest(request, source),
|
||||
encryptedSavedObjectsClient,
|
||||
preconfiguredActions,
|
||||
actionId,
|
||||
namespace.namespace
|
||||
);
|
||||
|
||||
if (!actionTypeRegistry.isActionExecutable(actionId, actionTypeId, { notifyUsage: true })) {
|
||||
actionTypeRegistry.ensureActionTypeEnabled(actionTypeId);
|
||||
}
|
||||
const actionType = actionTypeRegistry.get(actionTypeId);
|
||||
|
||||
let validatedParams: Record<string, unknown>;
|
||||
let validatedConfig: Record<string, unknown>;
|
||||
let validatedSecrets: Record<string, unknown>;
|
||||
|
||||
try {
|
||||
validatedParams = validateParams(actionType, params);
|
||||
validatedConfig = validateConfig(actionType, config);
|
||||
validatedSecrets = validateSecrets(actionType, secrets);
|
||||
} catch (err) {
|
||||
return { status: 'error', actionId, message: err.message, retry: false };
|
||||
}
|
||||
|
||||
const actionLabel = `${actionTypeId}:${actionId}: ${name}`;
|
||||
logger.debug(`executing action ${actionLabel}`);
|
||||
|
||||
const event: IEvent = {
|
||||
event: { action: EVENT_LOG_ACTIONS.execute },
|
||||
kibana: {
|
||||
saved_objects: [
|
||||
{
|
||||
rel: SAVED_OBJECT_REL_PRIMARY,
|
||||
type: 'action',
|
||||
id: actionId,
|
||||
...namespace,
|
||||
},
|
||||
],
|
||||
return withSpan(
|
||||
{
|
||||
name: `execute_action`,
|
||||
type: 'actions',
|
||||
labels: {
|
||||
actionId,
|
||||
},
|
||||
},
|
||||
};
|
||||
async (span) => {
|
||||
const {
|
||||
logger,
|
||||
spaces,
|
||||
getServices,
|
||||
encryptedSavedObjectsClient,
|
||||
actionTypeRegistry,
|
||||
eventLogger,
|
||||
preconfiguredActions,
|
||||
getActionsClientWithRequest,
|
||||
} = this.actionExecutorContext!;
|
||||
|
||||
eventLogger.startTiming(event);
|
||||
let rawResult: ActionTypeExecutorResult<unknown>;
|
||||
try {
|
||||
rawResult = await actionType.executor({
|
||||
actionId,
|
||||
services,
|
||||
params: validatedParams,
|
||||
config: validatedConfig,
|
||||
secrets: validatedSecrets,
|
||||
});
|
||||
} catch (err) {
|
||||
rawResult = {
|
||||
actionId,
|
||||
status: 'error',
|
||||
message: 'an error occurred while running the action executor',
|
||||
serviceMessage: err.message,
|
||||
retry: false,
|
||||
};
|
||||
}
|
||||
eventLogger.stopTiming(event);
|
||||
const services = getServices(request);
|
||||
const spaceId = spaces && spaces.getSpaceId(request);
|
||||
const namespace = spaceId && spaceId !== 'default' ? { namespace: spaceId } : {};
|
||||
|
||||
// allow null-ish return to indicate success
|
||||
const result = rawResult || {
|
||||
actionId,
|
||||
status: 'ok',
|
||||
};
|
||||
const { actionTypeId, name, config, secrets } = await getActionInfo(
|
||||
await getActionsClientWithRequest(request, source),
|
||||
encryptedSavedObjectsClient,
|
||||
preconfiguredActions,
|
||||
actionId,
|
||||
namespace.namespace
|
||||
);
|
||||
|
||||
event.event = event.event || {};
|
||||
if (span) {
|
||||
span.name = `execute_action ${actionTypeId}`;
|
||||
span.addLabels({
|
||||
actionTypeId,
|
||||
});
|
||||
}
|
||||
|
||||
if (result.status === 'ok') {
|
||||
event.event.outcome = 'success';
|
||||
event.message = `action executed: ${actionLabel}`;
|
||||
} else if (result.status === 'error') {
|
||||
event.event.outcome = 'failure';
|
||||
event.message = `action execution failure: ${actionLabel}`;
|
||||
event.error = event.error || {};
|
||||
event.error.message = actionErrorToMessage(result);
|
||||
logger.warn(`action execution failure: ${actionLabel}: ${event.error.message}`);
|
||||
} else {
|
||||
event.event.outcome = 'failure';
|
||||
event.message = `action execution returned unexpected result: ${actionLabel}: "${result.status}"`;
|
||||
event.error = event.error || {};
|
||||
event.error.message = 'action execution returned unexpected result';
|
||||
logger.warn(
|
||||
`action execution failure: ${actionLabel}: returned unexpected result "${result.status}"`
|
||||
);
|
||||
}
|
||||
if (!actionTypeRegistry.isActionExecutable(actionId, actionTypeId, { notifyUsage: true })) {
|
||||
actionTypeRegistry.ensureActionTypeEnabled(actionTypeId);
|
||||
}
|
||||
const actionType = actionTypeRegistry.get(actionTypeId);
|
||||
|
||||
eventLogger.logEvent(event);
|
||||
return result;
|
||||
let validatedParams: Record<string, unknown>;
|
||||
let validatedConfig: Record<string, unknown>;
|
||||
let validatedSecrets: Record<string, unknown>;
|
||||
|
||||
try {
|
||||
validatedParams = validateParams(actionType, params);
|
||||
validatedConfig = validateConfig(actionType, config);
|
||||
validatedSecrets = validateSecrets(actionType, secrets);
|
||||
} catch (err) {
|
||||
span?.setOutcome('failure');
|
||||
return { status: 'error', actionId, message: err.message, retry: false };
|
||||
}
|
||||
|
||||
const actionLabel = `${actionTypeId}:${actionId}: ${name}`;
|
||||
logger.debug(`executing action ${actionLabel}`);
|
||||
|
||||
const event: IEvent = {
|
||||
event: { action: EVENT_LOG_ACTIONS.execute },
|
||||
kibana: {
|
||||
saved_objects: [
|
||||
{
|
||||
rel: SAVED_OBJECT_REL_PRIMARY,
|
||||
type: 'action',
|
||||
id: actionId,
|
||||
...namespace,
|
||||
},
|
||||
],
|
||||
},
|
||||
};
|
||||
|
||||
eventLogger.startTiming(event);
|
||||
let rawResult: ActionTypeExecutorResult<unknown>;
|
||||
try {
|
||||
rawResult = await actionType.executor({
|
||||
actionId,
|
||||
services,
|
||||
params: validatedParams,
|
||||
config: validatedConfig,
|
||||
secrets: validatedSecrets,
|
||||
});
|
||||
} catch (err) {
|
||||
rawResult = {
|
||||
actionId,
|
||||
status: 'error',
|
||||
message: 'an error occurred while running the action executor',
|
||||
serviceMessage: err.message,
|
||||
retry: false,
|
||||
};
|
||||
}
|
||||
eventLogger.stopTiming(event);
|
||||
|
||||
// allow null-ish return to indicate success
|
||||
const result = rawResult || {
|
||||
actionId,
|
||||
status: 'ok',
|
||||
};
|
||||
|
||||
event.event = event.event || {};
|
||||
|
||||
if (result.status === 'ok') {
|
||||
span?.setOutcome('success');
|
||||
event.event.outcome = 'success';
|
||||
event.message = `action executed: ${actionLabel}`;
|
||||
} else if (result.status === 'error') {
|
||||
span?.setOutcome('failure');
|
||||
event.event.outcome = 'failure';
|
||||
event.message = `action execution failure: ${actionLabel}`;
|
||||
event.error = event.error || {};
|
||||
event.error.message = actionErrorToMessage(result);
|
||||
logger.warn(`action execution failure: ${actionLabel}: ${event.error.message}`);
|
||||
} else {
|
||||
span?.setOutcome('failure');
|
||||
event.event.outcome = 'failure';
|
||||
event.message = `action execution returned unexpected result: ${actionLabel}: "${result.status}"`;
|
||||
event.error = event.error || {};
|
||||
event.error.message = 'action execution returned unexpected result';
|
||||
logger.warn(
|
||||
`action execution failure: ${actionLabel}: returned unexpected result "${result.status}"`
|
||||
);
|
||||
}
|
||||
|
||||
eventLogger.logEvent(event);
|
||||
return result;
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,10 +20,11 @@ import { mockLogger } from '../test_utils';
|
|||
import { TaskClaiming, OwnershipClaimingOpts, TaskClaimingOpts } from './task_claiming';
|
||||
import { Observable } from 'rxjs';
|
||||
import { taskStoreMock } from '../task_store.mock';
|
||||
import apm from 'elastic-apm-node';
|
||||
|
||||
const taskManagerLogger = mockLogger();
|
||||
|
||||
beforeEach(() => jest.resetAllMocks());
|
||||
beforeEach(() => jest.clearAllMocks());
|
||||
|
||||
const mockedDate = new Date('2019-02-12T21:01:22.479Z');
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
|
@ -52,7 +53,19 @@ taskDefinitions.registerTaskDefinitions({
|
|||
},
|
||||
});
|
||||
|
||||
const mockApmTrans = {
|
||||
end: jest.fn(),
|
||||
};
|
||||
|
||||
describe('TaskClaiming', () => {
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
jest
|
||||
.spyOn(apm, 'startTransaction')
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
.mockImplementation(() => mockApmTrans as any);
|
||||
});
|
||||
|
||||
test(`should log when a certain task type is skipped due to having a zero concurency configuration`, () => {
|
||||
const definitions = new TaskTypeDictionary(mockLogger());
|
||||
definitions.registerTaskDefinitions({
|
||||
|
@ -169,6 +182,12 @@ describe('TaskClaiming', () => {
|
|||
|
||||
const results = await getAllAsPromise(taskClaiming.claimAvailableTasks(claimingOpts));
|
||||
|
||||
expect(apm.startTransaction).toHaveBeenCalledWith(
|
||||
'markAvailableTasksAsClaimed',
|
||||
'taskManager markAvailableTasksAsClaimed'
|
||||
);
|
||||
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
|
||||
|
||||
expect(store.updateByQuery.mock.calls[0][1]).toMatchObject({
|
||||
max_docs: getCapacity(),
|
||||
});
|
||||
|
@ -187,6 +206,49 @@ describe('TaskClaiming', () => {
|
|||
}));
|
||||
}
|
||||
|
||||
test('makes calls to APM as expected when markAvailableTasksAsClaimed throws error', async () => {
|
||||
const maxAttempts = _.random(2, 43);
|
||||
const customMaxAttempts = _.random(44, 100);
|
||||
|
||||
const definitions = new TaskTypeDictionary(mockLogger());
|
||||
definitions.registerTaskDefinitions({
|
||||
foo: {
|
||||
title: 'foo',
|
||||
createTaskRunner: jest.fn(),
|
||||
},
|
||||
bar: {
|
||||
title: 'bar',
|
||||
maxAttempts: customMaxAttempts,
|
||||
createTaskRunner: jest.fn(),
|
||||
},
|
||||
});
|
||||
|
||||
const { taskClaiming, store } = initialiseTestClaiming({
|
||||
storeOpts: {
|
||||
definitions,
|
||||
},
|
||||
taskClaimingOpts: {
|
||||
maxAttempts,
|
||||
},
|
||||
});
|
||||
|
||||
store.updateByQuery.mockRejectedValue(new Error('Oh no'));
|
||||
|
||||
await expect(
|
||||
getAllAsPromise(
|
||||
taskClaiming.claimAvailableTasks({
|
||||
claimOwnershipUntil: new Date(),
|
||||
})
|
||||
)
|
||||
).rejects.toMatchInlineSnapshot(`[Error: Oh no]`);
|
||||
|
||||
expect(apm.startTransaction).toHaveBeenCalledWith(
|
||||
'markAvailableTasksAsClaimed',
|
||||
'taskManager markAvailableTasksAsClaimed'
|
||||
);
|
||||
expect(mockApmTrans.end).toHaveBeenCalledWith('failure');
|
||||
});
|
||||
|
||||
test('it filters claimed tasks down by supported types, maxAttempts, status, and runAt', async () => {
|
||||
const maxAttempts = _.random(2, 43);
|
||||
const customMaxAttempts = _.random(44, 100);
|
||||
|
@ -1105,6 +1167,7 @@ if (doc['task.runAt'].size()!=0) {
|
|||
startedAt: null,
|
||||
retryAt: null,
|
||||
scheduledAt: new Date(),
|
||||
traceparent: 'parent',
|
||||
},
|
||||
{
|
||||
id: 'claimed-by-schedule',
|
||||
|
@ -1121,6 +1184,7 @@ if (doc['task.runAt'].size()!=0) {
|
|||
startedAt: null,
|
||||
retryAt: null,
|
||||
scheduledAt: new Date(),
|
||||
traceparent: 'newParent',
|
||||
},
|
||||
{
|
||||
id: 'already-running',
|
||||
|
@ -1137,6 +1201,7 @@ if (doc['task.runAt'].size()!=0) {
|
|||
startedAt: null,
|
||||
retryAt: null,
|
||||
scheduledAt: new Date(),
|
||||
traceparent: '',
|
||||
},
|
||||
];
|
||||
|
||||
|
@ -1222,6 +1287,7 @@ if (doc['task.runAt'].size()!=0) {
|
|||
startedAt: null,
|
||||
retryAt: null,
|
||||
scheduledAt: new Date(),
|
||||
traceparent: 'parent',
|
||||
})
|
||||
)
|
||||
);
|
||||
|
@ -1277,6 +1343,7 @@ if (doc['task.runAt'].size()!=0) {
|
|||
startedAt: null,
|
||||
retryAt: null,
|
||||
scheduledAt: new Date(),
|
||||
traceparent: '',
|
||||
},
|
||||
],
|
||||
// second cycle
|
||||
|
@ -1296,6 +1363,7 @@ if (doc['task.runAt'].size()!=0) {
|
|||
startedAt: null,
|
||||
retryAt: null,
|
||||
scheduledAt: new Date(),
|
||||
traceparent: '',
|
||||
},
|
||||
],
|
||||
],
|
||||
|
@ -1347,6 +1415,7 @@ if (doc['task.runAt'].size()!=0) {
|
|||
startedAt: null,
|
||||
retryAt: null,
|
||||
scheduledAt: new Date(),
|
||||
traceparent: '',
|
||||
}),
|
||||
errorType: TaskClaimErrorType.CLAIMED_BY_ID_OUT_OF_CAPACITY,
|
||||
})
|
||||
|
@ -1393,6 +1462,7 @@ if (doc['task.runAt'].size()!=0) {
|
|||
startedAt: null,
|
||||
retryAt: null,
|
||||
scheduledAt: new Date(),
|
||||
traceparent: 'newParent',
|
||||
})
|
||||
)
|
||||
);
|
||||
|
@ -1437,6 +1507,7 @@ if (doc['task.runAt'].size()!=0) {
|
|||
startedAt: null,
|
||||
retryAt: null,
|
||||
scheduledAt: new Date(),
|
||||
traceparent: '',
|
||||
}),
|
||||
errorType: TaskClaimErrorType.CLAIMED_BY_ID_NOT_IN_CLAIMING_STATUS,
|
||||
})
|
||||
|
@ -1499,6 +1570,7 @@ function mockInstance(instance: Partial<ConcreteTaskInstance> = {}) {
|
|||
status: 'idle',
|
||||
user: 'example',
|
||||
ownerId: null,
|
||||
traceparent: '',
|
||||
},
|
||||
instance
|
||||
);
|
||||
|
|
|
@ -379,34 +379,41 @@ export class TaskClaiming {
|
|||
sort.unshift('_score');
|
||||
}
|
||||
|
||||
const apmTrans = apm.startTransaction(`taskManager markAvailableTasksAsClaimed`, 'taskManager');
|
||||
const result = await this.taskStore.updateByQuery(
|
||||
{
|
||||
query: matchesClauses(
|
||||
claimTasksById && claimTasksById.length
|
||||
? mustBeAllOf(asPinnedQuery(claimTasksById, queryForScheduledTasks))
|
||||
: queryForScheduledTasks,
|
||||
filterDownBy(InactiveTasks)
|
||||
),
|
||||
script: updateFieldsAndMarkAsFailed(
|
||||
{
|
||||
ownerId: this.taskStore.taskManagerId,
|
||||
retryAt: claimOwnershipUntil,
|
||||
},
|
||||
claimTasksById || [],
|
||||
taskTypesToClaim,
|
||||
taskTypesToSkip,
|
||||
pick(this.taskMaxAttempts, taskTypesToClaim)
|
||||
),
|
||||
sort,
|
||||
},
|
||||
{
|
||||
max_docs: size,
|
||||
}
|
||||
const apmTrans = apm.startTransaction(
|
||||
'markAvailableTasksAsClaimed',
|
||||
`taskManager markAvailableTasksAsClaimed`
|
||||
);
|
||||
|
||||
if (apmTrans) apmTrans.end();
|
||||
return result;
|
||||
try {
|
||||
const result = await this.taskStore.updateByQuery(
|
||||
{
|
||||
query: matchesClauses(
|
||||
claimTasksById && claimTasksById.length
|
||||
? mustBeAllOf(asPinnedQuery(claimTasksById, queryForScheduledTasks))
|
||||
: queryForScheduledTasks,
|
||||
filterDownBy(InactiveTasks)
|
||||
),
|
||||
script: updateFieldsAndMarkAsFailed(
|
||||
{
|
||||
ownerId: this.taskStore.taskManagerId,
|
||||
retryAt: claimOwnershipUntil,
|
||||
},
|
||||
claimTasksById || [],
|
||||
taskTypesToClaim,
|
||||
taskTypesToSkip,
|
||||
pick(this.taskMaxAttempts, taskTypesToClaim)
|
||||
),
|
||||
sort,
|
||||
},
|
||||
{
|
||||
max_docs: size,
|
||||
}
|
||||
);
|
||||
apmTrans?.end('success');
|
||||
return result;
|
||||
} catch (err) {
|
||||
apmTrans?.end('failure');
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -29,6 +29,9 @@
|
|||
"status": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"traceparent": {
|
||||
"type": "text"
|
||||
},
|
||||
"params": {
|
||||
"type": "text"
|
||||
},
|
||||
|
|
|
@ -257,6 +257,11 @@ export interface TaskInstance {
|
|||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
state: Record<string, any>;
|
||||
|
||||
/**
|
||||
* The serialized traceparent string of the current APM transaction or span.
|
||||
*/
|
||||
traceparent?: string;
|
||||
|
||||
/**
|
||||
* The id of the user who scheduled this task.
|
||||
*/
|
||||
|
@ -364,6 +369,7 @@ export type SerializedConcreteTaskInstance = Omit<
|
|||
> & {
|
||||
state: string;
|
||||
params: string;
|
||||
traceparent: string;
|
||||
scheduledAt: string;
|
||||
startedAt: string | null;
|
||||
retryAt: string | null;
|
||||
|
|
|
@ -18,6 +18,7 @@ import { TaskDefinitionRegistry, TaskTypeDictionary } from '../task_type_diction
|
|||
import { mockLogger } from '../test_utils';
|
||||
import { throwUnrecoverableError } from './errors';
|
||||
import { taskStoreMock } from '../task_store.mock';
|
||||
import apm from 'elastic-apm-node';
|
||||
|
||||
const minutesFromNow = (mins: number): Date => secondsFromNow(mins * 60);
|
||||
|
||||
|
@ -32,8 +33,70 @@ afterAll(() => fakeTimer.restore());
|
|||
describe('TaskManagerRunner', () => {
|
||||
const pendingStageSetup = (opts: TestOpts) => testOpts(TaskRunningStage.PENDING, opts);
|
||||
const readyToRunStageSetup = (opts: TestOpts) => testOpts(TaskRunningStage.READY_TO_RUN, opts);
|
||||
const mockApmTrans = {
|
||||
end: jest.fn(),
|
||||
};
|
||||
|
||||
describe('Pending Stage', () => {
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
jest
|
||||
.spyOn(apm, 'startTransaction')
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
.mockImplementation(() => mockApmTrans as any);
|
||||
});
|
||||
test('makes calls to APM as expected when task markedAsRunning is success', async () => {
|
||||
const { runner } = await pendingStageSetup({
|
||||
instance: {
|
||||
schedule: {
|
||||
interval: '10m',
|
||||
},
|
||||
},
|
||||
definitions: {
|
||||
bar: {
|
||||
title: 'Bar!',
|
||||
createTaskRunner: () => ({
|
||||
run: async () => undefined,
|
||||
}),
|
||||
},
|
||||
},
|
||||
});
|
||||
await runner.markTaskAsRunning();
|
||||
expect(apm.startTransaction).toHaveBeenCalledWith(
|
||||
'taskManager',
|
||||
'taskManager markTaskAsRunning'
|
||||
);
|
||||
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
|
||||
});
|
||||
test('makes calls to APM as expected when task markedAsRunning fails', async () => {
|
||||
const { runner, store } = await pendingStageSetup({
|
||||
instance: {
|
||||
schedule: {
|
||||
interval: '10m',
|
||||
},
|
||||
},
|
||||
definitions: {
|
||||
bar: {
|
||||
title: 'Bar!',
|
||||
createTaskRunner: () => ({
|
||||
run: async () => undefined,
|
||||
}),
|
||||
},
|
||||
},
|
||||
});
|
||||
store.update.mockRejectedValue(
|
||||
SavedObjectsErrorHelpers.createGenericNotFoundError('type', 'id')
|
||||
);
|
||||
await expect(runner.markTaskAsRunning()).rejects.toMatchInlineSnapshot(
|
||||
`[Error: Saved object [type/id] not found]`
|
||||
);
|
||||
// await runner.markTaskAsRunning();
|
||||
expect(apm.startTransaction).toHaveBeenCalledWith(
|
||||
'taskManager',
|
||||
'taskManager markTaskAsRunning'
|
||||
);
|
||||
expect(mockApmTrans.end).toHaveBeenCalledWith('failure');
|
||||
});
|
||||
test('provides details about the task that is running', async () => {
|
||||
const { runner } = await pendingStageSetup({
|
||||
instance: {
|
||||
|
@ -572,6 +635,55 @@ describe('TaskManagerRunner', () => {
|
|||
});
|
||||
|
||||
describe('Ready To Run Stage', () => {
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
test('makes calls to APM as expected when task runs successfully', async () => {
|
||||
const { runner } = await readyToRunStageSetup({
|
||||
instance: {
|
||||
params: { a: 'b' },
|
||||
state: { hey: 'there' },
|
||||
},
|
||||
definitions: {
|
||||
bar: {
|
||||
title: 'Bar!',
|
||||
createTaskRunner: () => ({
|
||||
async run() {
|
||||
return { state: {} };
|
||||
},
|
||||
}),
|
||||
},
|
||||
},
|
||||
});
|
||||
await runner.run();
|
||||
expect(apm.startTransaction).toHaveBeenCalledWith('bar', 'taskManager run', {
|
||||
childOf: 'apmTraceparent',
|
||||
});
|
||||
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
|
||||
});
|
||||
test('makes calls to APM as expected when task fails', async () => {
|
||||
const { runner } = await readyToRunStageSetup({
|
||||
instance: {
|
||||
params: { a: 'b' },
|
||||
state: { hey: 'there' },
|
||||
},
|
||||
definitions: {
|
||||
bar: {
|
||||
title: 'Bar!',
|
||||
createTaskRunner: () => ({
|
||||
async run() {
|
||||
throw new Error('rar');
|
||||
},
|
||||
}),
|
||||
},
|
||||
},
|
||||
});
|
||||
await runner.run();
|
||||
expect(apm.startTransaction).toHaveBeenCalledWith('bar', 'taskManager run', {
|
||||
childOf: 'apmTraceparent',
|
||||
});
|
||||
expect(mockApmTrans.end).toHaveBeenCalledWith('failure');
|
||||
});
|
||||
test('queues a reattempt if the task fails', async () => {
|
||||
const initialAttempts = _.random(0, 2);
|
||||
const id = Date.now().toString();
|
||||
|
@ -1275,6 +1387,7 @@ describe('TaskManagerRunner', () => {
|
|||
status: 'idle',
|
||||
user: 'example',
|
||||
ownerId: null,
|
||||
traceparent: 'apmTraceparent',
|
||||
},
|
||||
instance
|
||||
);
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
*/
|
||||
|
||||
import apm from 'elastic-apm-node';
|
||||
import { withSpan } from '@kbn/apm-utils';
|
||||
import { performance } from 'perf_hooks';
|
||||
import { identity, defaults, flow } from 'lodash';
|
||||
import { Logger, SavedObjectsErrorHelpers } from '../../../../../src/core/server';
|
||||
|
@ -242,30 +243,38 @@ export class TaskManagerRunner implements TaskRunner {
|
|||
);
|
||||
}
|
||||
this.logger.debug(`Running task ${this}`);
|
||||
|
||||
const apmTrans = apm.startTransaction(this.taskType, 'taskManager run', {
|
||||
childOf: this.instance.task.traceparent,
|
||||
});
|
||||
|
||||
const modifiedContext = await this.beforeRun({
|
||||
taskInstance: this.instance.task,
|
||||
});
|
||||
|
||||
const stopTaskTimer = startTaskTimer();
|
||||
const apmTrans = apm.startTransaction(`taskManager run`, 'taskManager');
|
||||
apmTrans?.addLabels({
|
||||
taskType: this.taskType,
|
||||
});
|
||||
|
||||
try {
|
||||
this.task = this.definition.createTaskRunner(modifiedContext);
|
||||
const result = await this.task.run();
|
||||
const result = await withSpan({ name: 'run', type: 'task manager' }, () => this.task!.run());
|
||||
const validatedResult = this.validateResult(result);
|
||||
const processedResult = await withSpan({ name: 'process result', type: 'task manager' }, () =>
|
||||
this.processResult(validatedResult, stopTaskTimer())
|
||||
);
|
||||
if (apmTrans) apmTrans.end('success');
|
||||
return this.processResult(validatedResult, stopTaskTimer());
|
||||
return processedResult;
|
||||
} catch (err) {
|
||||
this.logger.error(`Task ${this} failed: ${err}`);
|
||||
// in error scenario, we can not get the RunResult
|
||||
// re-use modifiedContext's state, which is correct as of beforeRun
|
||||
if (apmTrans) apmTrans.end('error');
|
||||
return this.processResult(
|
||||
asErr({ error: err, state: modifiedContext.taskInstance.state }),
|
||||
stopTaskTimer()
|
||||
const processedResult = await withSpan({ name: 'process result', type: 'task manager' }, () =>
|
||||
this.processResult(
|
||||
asErr({ error: err, state: modifiedContext.taskInstance.state }),
|
||||
stopTaskTimer()
|
||||
)
|
||||
);
|
||||
if (apmTrans) apmTrans.end('failure');
|
||||
return processedResult;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -285,10 +294,7 @@ export class TaskManagerRunner implements TaskRunner {
|
|||
}
|
||||
performance.mark('markTaskAsRunning_start');
|
||||
|
||||
const apmTrans = apm.startTransaction(`taskManager markTaskAsRunning`, 'taskManager');
|
||||
apmTrans?.addLabels({
|
||||
taskType: this.taskType,
|
||||
});
|
||||
const apmTrans = apm.startTransaction('taskManager', 'taskManager markTaskAsRunning');
|
||||
|
||||
const now = new Date();
|
||||
try {
|
||||
|
|
|
@ -27,6 +27,10 @@ import { TaskRunResult } from './task_running';
|
|||
import { mockLogger } from './test_utils';
|
||||
import { TaskTypeDictionary } from './task_type_dictionary';
|
||||
|
||||
jest.mock('elastic-apm-node', () => ({
|
||||
currentTraceparent: 'parent',
|
||||
}));
|
||||
|
||||
describe('TaskScheduling', () => {
|
||||
const mockTaskStore = taskStoreMock.create({});
|
||||
const mockTaskManager = taskPollingLifecycleMock.create({});
|
||||
|
@ -60,6 +64,12 @@ describe('TaskScheduling', () => {
|
|||
};
|
||||
await taskScheduling.schedule(task);
|
||||
expect(mockTaskStore.schedule).toHaveBeenCalled();
|
||||
expect(mockTaskStore.schedule).toHaveBeenCalledWith({
|
||||
...task,
|
||||
id: undefined,
|
||||
schedule: undefined,
|
||||
traceparent: 'parent',
|
||||
});
|
||||
});
|
||||
|
||||
test('allows scheduling existing tasks that may have already been scheduled', async () => {
|
||||
|
@ -420,6 +430,7 @@ function mockTask(overrides: Partial<ConcreteTaskInstance> = {}): ConcreteTaskIn
|
|||
startedAt: null,
|
||||
retryAt: null,
|
||||
scheduledAt: new Date(),
|
||||
traceparent: 'taskTraceparent',
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import { filter } from 'rxjs/operators';
|
|||
import { pipe } from 'fp-ts/lib/pipeable';
|
||||
import { Option, map as mapOptional, getOrElse, isSome } from 'fp-ts/lib/Option';
|
||||
|
||||
import agent from 'elastic-apm-node';
|
||||
import { Logger } from '../../../../src/core/server';
|
||||
import { asOk, either, map, mapErr, promiseResult } from './lib/result_type';
|
||||
import {
|
||||
|
@ -85,7 +86,10 @@ export class TaskScheduling {
|
|||
...options,
|
||||
taskInstance: ensureDeprecatedFieldsAreCorrected(taskInstance, this.logger),
|
||||
});
|
||||
return await this.store.schedule(modifiedTask);
|
||||
return await this.store.schedule({
|
||||
...modifiedTask,
|
||||
traceparent: agent.currentTraceparent ?? '',
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -94,6 +94,7 @@ describe('TaskStore', () => {
|
|||
params: { hello: 'world' },
|
||||
state: { foo: 'bar' },
|
||||
taskType: 'report',
|
||||
traceparent: 'apmTraceparent',
|
||||
};
|
||||
const result = await testSchedule(task);
|
||||
|
||||
|
@ -112,6 +113,7 @@ describe('TaskStore', () => {
|
|||
status: 'idle',
|
||||
taskType: 'report',
|
||||
user: undefined,
|
||||
traceparent: 'apmTraceparent',
|
||||
},
|
||||
{
|
||||
id: 'id',
|
||||
|
@ -134,6 +136,7 @@ describe('TaskStore', () => {
|
|||
taskType: 'report',
|
||||
user: undefined,
|
||||
version: '123',
|
||||
traceparent: 'apmTraceparent',
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -285,6 +288,7 @@ describe('TaskStore', () => {
|
|||
status: 'idle' as TaskStatus,
|
||||
version: '123',
|
||||
ownerId: null,
|
||||
traceparent: 'myTraceparent',
|
||||
};
|
||||
|
||||
savedObjectsClient.update.mockImplementation(
|
||||
|
@ -318,6 +322,7 @@ describe('TaskStore', () => {
|
|||
taskType: task.taskType,
|
||||
user: undefined,
|
||||
ownerId: null,
|
||||
traceparent: 'myTraceparent',
|
||||
},
|
||||
{ version: '123', refresh: false }
|
||||
);
|
||||
|
@ -347,6 +352,7 @@ describe('TaskStore', () => {
|
|||
status: 'idle' as TaskStatus,
|
||||
version: '123',
|
||||
ownerId: null,
|
||||
traceparent: '',
|
||||
};
|
||||
|
||||
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
|
||||
|
@ -384,6 +390,7 @@ describe('TaskStore', () => {
|
|||
status: 'idle' as TaskStatus,
|
||||
version: '123',
|
||||
ownerId: null,
|
||||
traceparent: '',
|
||||
};
|
||||
|
||||
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
|
||||
|
@ -500,6 +507,7 @@ describe('TaskStore', () => {
|
|||
status: status as TaskStatus,
|
||||
version: '123',
|
||||
ownerId: null,
|
||||
traceparent: 'myTraceparent',
|
||||
};
|
||||
|
||||
savedObjectsClient.get.mockImplementation(async (type: string, objectId: string) => ({
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue