[Response Ops] Capture durations for different phases in alerting task runner (#139323)

* Adding rule run timer to calculate durations for specific spans

* Rename to TaskRunnerTimer. Update event log schema

* Adding unit tests

* Updating functional tests

* Fixing tests

* Fixing duration calculation

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2022-08-29 09:11:56 -04:00 committed by GitHub
parent c4fa211b6a
commit ee6318a2da
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 796 additions and 280 deletions

View file

@ -26,6 +26,7 @@ import {
} from '../../types';
import { RuleRunMetrics } from '../rule_run_metrics_store';
import { EVENT_LOG_ACTIONS } from '../../plugin';
import { TaskRunnerTimerSpan } from '../../task_runner/task_runner_timer';
const mockNow = '2020-01-01T02:00:00.000Z';
const eventLogger = eventLoggerMock.create();
@ -691,6 +692,118 @@ describe('AlertingEventLogger', () => {
expect(eventLogger.logEvent).toHaveBeenCalledWith(loggedEvent);
});
test('should set fields from execution timings if provided', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.done({
timings: {
[TaskRunnerTimerSpan.StartTaskRun]: 10,
[TaskRunnerTimerSpan.TotalRunDuration]: 20,
[TaskRunnerTimerSpan.PrepareRule]: 30,
[TaskRunnerTimerSpan.RuleTypeRun]: 40,
[TaskRunnerTimerSpan.ProcessAlerts]: 50,
[TaskRunnerTimerSpan.TriggerActions]: 60,
[TaskRunnerTimerSpan.ProcessRuleRun]: 70,
},
});
const event = initializeExecuteRecord(contextWithScheduleDelay);
const loggedEvent = {
...event,
kibana: {
...event.kibana,
alert: {
...event.kibana?.alert,
rule: {
...event.kibana?.alert?.rule,
execution: {
...event.kibana?.alert?.rule?.execution,
metrics: {
claim_to_start_duration_ms: 10,
total_run_duration_ms: 20,
prepare_rule_duration_ms: 30,
rule_type_run_duration_ms: 40,
process_alerts_duration_ms: 50,
trigger_actions_duration_ms: 60,
process_rule_duration_ms: 70,
},
},
},
},
},
};
expect(alertingEventLogger.getEvent()).toEqual(loggedEvent);
expect(eventLogger.logEvent).toHaveBeenCalledWith(loggedEvent);
});
test('should set fields from execution metrics and timings if both provided', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.done({
metrics: {
numberOfTriggeredActions: 1,
numberOfGeneratedActions: 2,
numberOfActiveAlerts: 3,
numberOfNewAlerts: 4,
numberOfRecoveredAlerts: 5,
numSearches: 6,
esSearchDurationMs: 3300,
totalSearchDurationMs: 10333,
hasReachedAlertLimit: false,
triggeredActionsStatus: ActionsCompletion.COMPLETE,
},
timings: {
[TaskRunnerTimerSpan.StartTaskRun]: 10,
[TaskRunnerTimerSpan.TotalRunDuration]: 20,
[TaskRunnerTimerSpan.PrepareRule]: 30,
[TaskRunnerTimerSpan.RuleTypeRun]: 40,
[TaskRunnerTimerSpan.ProcessAlerts]: 50,
[TaskRunnerTimerSpan.TriggerActions]: 60,
[TaskRunnerTimerSpan.ProcessRuleRun]: 70,
},
});
const event = initializeExecuteRecord(contextWithScheduleDelay);
const loggedEvent = {
...event,
kibana: {
...event.kibana,
alert: {
...event.kibana?.alert,
rule: {
...event.kibana?.alert?.rule,
execution: {
...event.kibana?.alert?.rule?.execution,
metrics: {
number_of_triggered_actions: 1,
number_of_generated_actions: 2,
alert_counts: {
active: 3,
new: 4,
recovered: 5,
},
number_of_searches: 6,
es_search_duration_ms: 3300,
total_search_duration_ms: 10333,
claim_to_start_duration_ms: 10,
total_run_duration_ms: 20,
prepare_rule_duration_ms: 30,
rule_type_run_duration_ms: 40,
process_alerts_duration_ms: 50,
trigger_actions_duration_ms: 60,
process_rule_duration_ms: 70,
},
},
},
},
},
};
expect(alertingEventLogger.getEvent()).toEqual(loggedEvent);
expect(eventLogger.logEvent).toHaveBeenCalledWith(loggedEvent);
});
test('should set fields to 0 execution metrics are provided but undefined', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();

View file

@ -5,9 +5,15 @@
* 2.0.
*/
import { IEvent, IEventLogger, SAVED_OBJECT_REL_PRIMARY } from '@kbn/event-log-plugin/server';
import {
IEvent,
IEventLogger,
millisToNanos,
SAVED_OBJECT_REL_PRIMARY,
} from '@kbn/event-log-plugin/server';
import { EVENT_LOG_ACTIONS } from '../../plugin';
import { UntypedNormalizedRuleType } from '../../rule_type_registry';
import { TaskRunnerTimings } from '../../task_runner/task_runner_timer';
import { AlertInstanceState, RuleExecutionStatus } from '../../types';
import { createAlertEventLogRecordObject } from '../create_alert_event_log_record_object';
import { RuleRunMetrics } from '../rule_run_metrics_store';
@ -31,6 +37,7 @@ type RuleContext = RuleContextOpts & {
};
interface DoneOpts {
timings?: TaskRunnerTimings;
status?: RuleExecutionStatus;
metrics?: RuleRunMetrics | null;
}
@ -100,7 +107,12 @@ export class AlertingEventLogger {
}
public getStartAndDuration(): { start?: Date; duration?: string | number } {
return { start: this.startTime, duration: this.event?.event?.duration };
return {
start: this.startTime,
duration: this.startTime
? millisToNanos(new Date().getTime() - this.startTime!.getTime())
: '0',
};
}
public setRuleName(ruleName: string) {
@ -152,7 +164,7 @@ export class AlertingEventLogger {
this.eventLogger.logEvent(createActionExecuteRecord(this.ruleContext, action));
}
public done({ status, metrics }: DoneOpts) {
public done({ status, metrics, timings }: DoneOpts) {
if (!this.isInitialized || !this.event || !this.ruleContext) {
throw new Error('AlertingEventLogger not initialized');
}
@ -187,6 +199,10 @@ export class AlertingEventLogger {
updateEvent(this.event, { metrics });
}
if (timings) {
updateEvent(this.event, { timings });
}
this.eventLogger.logEvent(this.event);
}
}
@ -324,9 +340,10 @@ interface UpdateEventOpts {
status?: string;
reason?: string;
metrics?: RuleRunMetrics;
timings?: TaskRunnerTimings;
}
export function updateEvent(event: IEvent, opts: UpdateEventOpts) {
const { message, outcome, error, ruleName, status, reason, metrics } = opts;
const { message, outcome, error, ruleName, status, reason, metrics, timings } = opts;
if (!event) {
throw new Error('Cannot update event because it is not initialized.');
}
@ -368,6 +385,7 @@ export function updateEvent(event: IEvent, opts: UpdateEventOpts) {
event.kibana.alert.rule = event.kibana.alert.rule || {};
event.kibana.alert.rule.execution = event.kibana.alert.rule.execution || {};
event.kibana.alert.rule.execution.metrics = {
...event.kibana.alert.rule.execution.metrics,
number_of_triggered_actions: metrics.numberOfTriggeredActions
? metrics.numberOfTriggeredActions
: 0,
@ -384,4 +402,15 @@ export function updateEvent(event: IEvent, opts: UpdateEventOpts) {
total_search_duration_ms: metrics.totalSearchDurationMs ? metrics.totalSearchDurationMs : 0,
};
}
if (timings) {
event.kibana = event.kibana || {};
event.kibana.alert = event.kibana.alert || {};
event.kibana.alert.rule = event.kibana.alert.rule || {};
event.kibana.alert.rule.execution = event.kibana.alert.rule.execution || {};
event.kibana.alert.rule.execution.metrics = {
...event.kibana.alert.rule.execution.metrics,
...timings,
};
}
}

View file

@ -83,6 +83,18 @@ describe('RuleRunMetricsStore', () => {
expect(ruleRunMetricsStore.getHasReachedAlertLimit()).toBe(true);
});
test('sets search metrics', () => {
const metricsStore = new RuleRunMetricsStore();
metricsStore.setSearchMetrics([
{ numSearches: 2, totalSearchDurationMs: 2222, esSearchDurationMs: 222 },
{ numSearches: 3, totalSearchDurationMs: 3333, esSearchDurationMs: 333 },
]);
expect(metricsStore.getNumSearches()).toEqual(5);
expect(metricsStore.getTotalSearchDurationMs()).toEqual(5555);
expect(metricsStore.getEsSearchDurationMs()).toEqual(555);
});
test('gets metrics', () => {
expect(ruleRunMetricsStore.getMetrics()).toEqual({
triggeredActionsStatus: 'partial',
@ -104,7 +116,22 @@ describe('RuleRunMetricsStore', () => {
expect(ruleRunMetricsStore.getNumberOfTriggeredActions()).toBe(6);
});
test('increments incrementNumberOfGeneratedActions by x', () => {
test('increments numSearches by x', () => {
ruleRunMetricsStore.incrementNumSearches(3);
expect(ruleRunMetricsStore.getNumSearches()).toBe(4);
});
test('increments totalSearchDurationMs by x', () => {
ruleRunMetricsStore.incrementTotalSearchDurationMs(2454);
expect(ruleRunMetricsStore.getTotalSearchDurationMs()).toBe(2456);
});
test('increments incrementEsSearchDurationMs by x', () => {
ruleRunMetricsStore.incrementEsSearchDurationMs(78758);
expect(ruleRunMetricsStore.getEsSearchDurationMs()).toBe(78761);
});
test('increments numberOfGeneratedActions by x', () => {
ruleRunMetricsStore.incrementNumberOfGeneratedActions(2);
expect(ruleRunMetricsStore.getNumberOfGeneratedActions()).toBe(17);
});

View file

@ -8,6 +8,7 @@
import { set } from 'lodash';
import { ActionsCompletion } from '../types';
import { ActionsConfigMap } from './get_actions_config_map';
import { SearchMetrics } from './types';
interface State {
numSearches: number;
@ -91,6 +92,13 @@ export class RuleRunMetricsStore {
};
// Setters
public setSearchMetrics = (searchMetrics: SearchMetrics[]) => {
for (const metric of searchMetrics) {
this.incrementNumSearches(metric.numSearches ?? 0);
this.incrementTotalSearchDurationMs(metric.totalSearchDurationMs ?? 0);
this.incrementEsSearchDurationMs(metric.esSearchDurationMs ?? 0);
}
};
public setNumSearches = (numSearches: number) => {
this.state.numSearches = numSearches;
};
@ -151,6 +159,15 @@ export class RuleRunMetricsStore {
this.state.connectorTypes[actionTypeId]?.triggeredActionsStatus === ActionsCompletion.PARTIAL;
// Incrementer
public incrementNumSearches = (incrementBy: number) => {
this.state.numSearches += incrementBy;
};
public incrementTotalSearchDurationMs = (incrementBy: number) => {
this.state.totalSearchDurationMs += incrementBy;
};
public incrementEsSearchDurationMs = (incrementBy: number) => {
this.state.esSearchDurationMs += incrementBy;
};
public incrementNumberOfTriggeredActions = () => {
this.state.numberOfTriggeredActions++;
};

View file

@ -2896,6 +2896,15 @@ describe('Task Runner', () => {
reason: errorReason,
},
},
timings: {
claim_to_start_duration_ms: 0,
prepare_rule_duration_ms: 0,
process_alerts_duration_ms: 0,
process_rule_duration_ms: 0,
rule_type_run_duration_ms: 0,
total_run_duration_ms: 0,
trigger_actions_duration_ms: 0,
},
});
} else if (status === 'warning') {
expect(alertingEventLogger.done).toHaveBeenCalledWith({
@ -2919,6 +2928,15 @@ describe('Task Runner', () => {
reason: errorReason,
},
},
timings: {
claim_to_start_duration_ms: 0,
prepare_rule_duration_ms: 0,
process_alerts_duration_ms: 0,
process_rule_duration_ms: 0,
rule_type_run_duration_ms: 0,
total_run_duration_ms: 0,
trigger_actions_duration_ms: 0,
},
});
} else {
expect(alertingEventLogger.done).toHaveBeenCalledWith({
@ -2938,6 +2956,15 @@ describe('Task Runner', () => {
lastExecutionDate: new Date('1970-01-01T00:00:00.000Z'),
status,
},
timings: {
claim_to_start_duration_ms: 0,
prepare_rule_duration_ms: 0,
process_alerts_duration_ms: 0,
process_rule_duration_ms: 0,
rule_type_run_duration_ms: 0,
total_run_duration_ms: 0,
trigger_actions_duration_ms: 0,
},
});
}

View file

@ -36,10 +36,8 @@ import {
RuleMonitoringHistory,
RuleTaskState,
RuleTypeRegistry,
SanitizedRule,
RulesClientApi,
} from '../types';
import { asErr, asOk, map, promiseResult, resolveErr, Resultable } from '../lib/result_type';
import { asErr, asOk, map, resolveErr, Result } from '../lib/result_type';
import { getExecutionDurationPercentiles, getExecutionSuccessRatio } from '../lib/monitoring';
import { taskInstanceToAlertTaskInstance } from './alert_task_instance';
import { isAlertSavedObjectNotFoundError, isEsUnavailableError } from '../lib/is_alerting_error';
@ -58,18 +56,18 @@ import { InMemoryMetrics, IN_MEMORY_METRICS } from '../monitoring';
import {
RuleTaskInstance,
RuleTaskRunResult,
RuleRunResult,
RuleTaskStateAndMetrics,
RunRuleParams,
} from './types';
import { createWrappedScopedClusterClientFactory } from '../lib/wrap_scoped_cluster_client';
import { IExecutionStatusAndMetrics } from '../lib/rule_execution_status';
import { RuleRunMetricsStore } from '../lib/rule_run_metrics_store';
import { wrapSearchSourceClient } from '../lib/wrap_search_source_client';
import { AlertingEventLogger } from '../lib/alerting_event_logger/alerting_event_logger';
import { SearchMetrics } from '../lib/types';
import { loadRule } from './rule_loader';
import { logAlerts } from './log_alerts';
import { scheduleActionsForAlerts } from './schedule_actions_for_alerts';
import { TaskRunnerTimer, TaskRunnerTimerSpan } from './task_runner_timer';
const FALLBACK_RETRY_INTERVAL = '5m';
const CONNECTIVITY_RETRY_INTERVAL = '5m';
@ -109,6 +107,8 @@ export class TaskRunner<
private readonly ruleTypeRegistry: RuleTypeRegistry;
private readonly inMemoryMetrics: InMemoryMetrics;
private readonly maxAlerts: number;
private alerts: Record<string, Alert<State, Context>>;
private timer: TaskRunnerTimer;
private alertingEventLogger: AlertingEventLogger;
private usageCounter?: UsageCounter;
private searchAbortController: AbortController;
@ -140,6 +140,8 @@ export class TaskRunner<
this.executionId = uuid.v4();
this.inMemoryMetrics = inMemoryMetrics;
this.maxAlerts = context.maxAlerts;
this.alerts = {};
this.timer = new TaskRunnerTimer({ logger: this.logger });
this.alertingEventLogger = new AlertingEventLogger(this.context.eventLogger);
}
@ -224,14 +226,24 @@ export class TaskRunner<
}
}
private async executeRule(
fakeRequest: KibanaRequest,
rulesClient: RulesClientApi,
rule: SanitizedRule<Params>,
apiKey: RawRule['apiKey'],
params: Params,
spaceId: string
): Promise<RuleTaskStateAndMetrics> {
private async runRule({
fakeRequest,
rulesClient,
rule,
apiKey,
validatedParams: params,
}: RunRuleParams<Params>): Promise<RuleTaskStateAndMetrics> {
if (apm.currentTransaction) {
apm.currentTransaction.name = `Execute Alerting Rule: "${rule.name}"`;
apm.currentTransaction.addLabels({
alerting_rule_consumer: rule.consumer,
alerting_rule_name: rule.name,
alerting_rule_tags: rule.tags.join(', '),
alerting_rule_type_id: rule.alertTypeId,
alerting_rule_params: JSON.stringify(rule.params),
});
}
const {
alertTypeId: ruleTypeId,
consumer,
@ -249,7 +261,7 @@ export class TaskRunner<
actions,
} = rule;
const {
params: { alertId: ruleId },
params: { alertId: ruleId, spaceId },
state: {
alertInstances: alertRawInstances = {},
alertTypeState: ruleTypeState = {},
@ -259,29 +271,9 @@ export class TaskRunner<
const ruleRunMetricsStore = new RuleRunMetricsStore();
const executionHandler = this.getExecutionHandler(
ruleId,
rule.name,
rule.tags,
spaceId,
apiKey,
this.context.kibanaBaseUrl,
rule.actions,
rule.params,
fakeRequest
);
const namespace = this.context.spaceIdToNamespace(spaceId);
const ruleType = this.ruleTypeRegistry.get(ruleTypeId);
const alerts: Record<string, Alert<State, Context>> = {};
for (const id in alertRawInstances) {
if (alertRawInstances.hasOwnProperty(id)) {
alerts[id] = new Alert<State, Context>(id, alertRawInstances[id]);
}
}
const originalAlerts = cloneDeep(alerts);
const ruleLabel = `${this.ruleType.id}:${ruleId}: '${name}'`;
const wrappedClientOptions = {
@ -305,17 +297,27 @@ export class TaskRunner<
searchSourceClient,
});
const { updatedRuleTypeState, hasReachedAlertLimit, originalAlerts } =
await this.timer.runWithTimer(TaskRunnerTimerSpan.RuleTypeRun, async () => {
for (const id in alertRawInstances) {
if (alertRawInstances.hasOwnProperty(id)) {
this.alerts[id] = new Alert<State, Context>(id, alertRawInstances[id]);
}
}
const alertsCopy = cloneDeep(this.alerts);
const alertFactory = createAlertFactory<
State,
Context,
WithoutReservedActionGroups<ActionGroupIds, RecoveryActionGroupId>
>({
alerts,
alerts: this.alerts,
logger: this.logger,
maxAlerts: this.maxAlerts,
canSetRecoveryContext: ruleType.doesSetRecoveryContext ?? false,
});
let updatedRuleTypeState: void | Record<string, unknown>;
let updatedState: void | Record<string, unknown>;
try {
const ctx = {
type: 'alert',
@ -330,7 +332,7 @@ export class TaskRunner<
includedHiddenTypes: ['alert', 'action'],
});
updatedRuleTypeState = await this.context.executionContext.withContext(ctx, () =>
updatedState = await this.context.executionContext.withContext(ctx, () =>
this.ruleType.executor({
alertId: ruleId,
executionId: this.executionId,
@ -394,46 +396,65 @@ export class TaskRunner<
this.alertingEventLogger.setExecutionSucceeded(`rule executed: ${ruleLabel}`);
const scopedClusterClientMetrics = wrappedScopedClusterClient.getMetrics();
const searchSourceClientMetrics = wrappedSearchSourceClient.getMetrics();
const searchMetrics: SearchMetrics = {
numSearches: scopedClusterClientMetrics.numSearches + searchSourceClientMetrics.numSearches,
totalSearchDurationMs:
scopedClusterClientMetrics.totalSearchDurationMs +
searchSourceClientMetrics.totalSearchDurationMs,
esSearchDurationMs:
scopedClusterClientMetrics.esSearchDurationMs +
searchSourceClientMetrics.esSearchDurationMs,
};
ruleRunMetricsStore.setSearchMetrics([
wrappedScopedClusterClient.getMetrics(),
wrappedSearchSourceClient.getMetrics(),
]);
ruleRunMetricsStore.setNumSearches(searchMetrics.numSearches);
ruleRunMetricsStore.setTotalSearchDurationMs(searchMetrics.totalSearchDurationMs);
ruleRunMetricsStore.setEsSearchDurationMs(searchMetrics.esSearchDurationMs);
const { newAlerts, activeAlerts, recoveredAlerts } = processAlerts<
State,
Context,
ActionGroupIds,
RecoveryActionGroupId
>({
alerts,
existingAlerts: originalAlerts,
return {
originalAlerts: alertsCopy,
updatedRuleTypeState: updatedState || undefined,
hasReachedAlertLimit: alertFactory.hasReachedAlertLimit(),
};
});
const { activeAlerts, recoveredAlerts } = await this.timer.runWithTimer(
TaskRunnerTimerSpan.ProcessAlerts,
async () => {
const {
newAlerts: processedAlertsNew,
activeAlerts: processedAlertsActive,
recoveredAlerts: processedAlertsRecovered,
} = processAlerts<State, Context, ActionGroupIds, RecoveryActionGroupId>({
alerts: this.alerts,
existingAlerts: originalAlerts,
hasReachedAlertLimit,
alertLimit: this.maxAlerts,
});
logAlerts({
logger: this.logger,
alertingEventLogger: this.alertingEventLogger,
newAlerts,
activeAlerts,
recoveredAlerts,
newAlerts: processedAlertsNew,
activeAlerts: processedAlertsActive,
recoveredAlerts: processedAlertsRecovered,
ruleLogPrefix: ruleLabel,
ruleRunMetricsStore,
canSetRecoveryContext: ruleType.doesSetRecoveryContext ?? false,
shouldPersistAlerts: this.shouldLogAndScheduleActionsForAlerts(),
});
return {
newAlerts: processedAlertsNew,
activeAlerts: processedAlertsActive,
recoveredAlerts: processedAlertsRecovered,
};
}
);
await this.timer.runWithTimer(TaskRunnerTimerSpan.TriggerActions, async () => {
const executionHandler = this.getExecutionHandler(
ruleId,
rule.name,
rule.tags,
spaceId,
apiKey,
this.context.kibanaBaseUrl,
rule.actions,
rule.params,
fakeRequest
);
await rulesClient.clearExpiredSnoozes({ id: rule.id });
const ruleIsSnoozed = isRuleSnoozed(rule);
@ -468,6 +489,7 @@ export class TaskRunner<
this.countUsageOfActionExecutionAfterRuleCancellation();
}
}
});
const alertsToReturn: Record<string, RawAlertInstance> = {};
for (const id in activeAlerts) {
@ -483,53 +505,21 @@ export class TaskRunner<
};
}
private async loadRuleAttributesAndRun(): Promise<Resultable<RuleRunResult, Error>> {
const {
params: { alertId: ruleId, spaceId },
} = this.taskInstance;
const { rule, fakeRequest, apiKey, rulesClient, validatedParams } = await loadRule<Params>({
paramValidator: this.ruleType.validate?.params,
ruleId,
spaceId,
context: this.context,
ruleTypeRegistry: this.ruleTypeRegistry,
alertingEventLogger: this.alertingEventLogger,
});
if (apm.currentTransaction) {
apm.currentTransaction.name = `Execute Alerting Rule: "${rule.name}"`;
apm.currentTransaction.addLabels({
alerting_rule_consumer: rule.consumer,
alerting_rule_name: rule.name,
alerting_rule_tags: rule.tags.join(', '),
alerting_rule_type_id: rule.alertTypeId,
alerting_rule_params: JSON.stringify(rule.params),
});
}
return {
rulesClient: asOk(rulesClient),
monitoring: asOk(rule.monitoring),
stateWithMetrics: await promiseResult<RuleTaskStateAndMetrics, Error>(
this.executeRule(fakeRequest, rulesClient, rule, apiKey, validatedParams, spaceId)
),
schedule: asOk(
// fetch the rule again to ensure we return the correct schedule as it may have
// changed during the task execution
(await rulesClient.get({ id: ruleId })).schedule
),
};
}
async run(): Promise<RuleTaskRunResult> {
/**
* Initialize event logger, load and validate the rule
*/
private async prepareToRun() {
const {
params: { alertId: ruleId, spaceId, consumer },
startedAt,
state: originalState,
schedule: taskSchedule,
} = this.taskInstance;
if (apm.currentTransaction) {
apm.currentTransaction.name = `Execute Alerting Rule`;
apm.currentTransaction.addLabels({
alerting_rule_id: ruleId,
});
}
// Initially use consumer as stored inside the task instance
// Replace this with consumer as read from the rule saved object after
// we successfully read the rule SO. This allows us to populate a consumer
@ -542,17 +532,6 @@ export class TaskRunner<
this.ruleConsumer = consumer;
}
if (apm.currentTransaction) {
apm.currentTransaction.name = `Execute Alerting Rule`;
apm.currentTransaction.addLabels({
alerting_rule_id: ruleId,
});
}
const runDate = new Date();
const runDateString = runDate.toISOString();
this.logger.debug(`executing rule ${this.ruleType.id}:${ruleId} at ${runDateString}`);
const namespace = this.context.spaceIdToNamespace(spaceId);
this.alertingEventLogger.initialize({
@ -567,14 +546,30 @@ export class TaskRunner<
this.alertingEventLogger.start();
const { stateWithMetrics, schedule, monitoring } = await errorAsRuleTaskRunResult(
this.loadRuleAttributesAndRun()
);
return await loadRule<Params>({
paramValidator: this.ruleType.validate?.params,
ruleId,
spaceId,
context: this.context,
ruleTypeRegistry: this.ruleTypeRegistry,
alertingEventLogger: this.alertingEventLogger,
});
}
const ruleMonitoring =
resolveErr<RuleMonitoring | undefined, Error>(monitoring, () => {
return getDefaultRuleMonitoring();
}) ?? getDefaultRuleMonitoring();
private async processRunResults({
runDate,
stateWithMetrics,
monitoring,
}: {
runDate: Date;
stateWithMetrics: Result<RuleTaskStateAndMetrics, Error>;
monitoring: RuleMonitoring;
}) {
const {
params: { alertId: ruleId, spaceId },
} = this.taskInstance;
const namespace = this.context.spaceIdToNamespace(spaceId);
const { status: executionStatus, metrics: executionMetrics } = map<
RuleTaskStateAndMetrics,
@ -603,8 +598,6 @@ export class TaskRunner<
);
}
this.alertingEventLogger.done({ status: executionStatus, metrics: executionMetrics });
const monitoringHistory: RuleMonitoringHistory = {
success: true,
timestamp: +new Date(),
@ -626,10 +619,10 @@ export class TaskRunner<
monitoringHistory.success = false;
}
ruleMonitoring.execution.history.push(monitoringHistory);
ruleMonitoring.execution.calculated_metrics = {
success_ratio: getExecutionSuccessRatio(ruleMonitoring),
...getExecutionDurationPercentiles(ruleMonitoring),
monitoring.execution.history.push(monitoringHistory);
monitoring.execution.calculated_metrics = {
success_ratio: getExecutionSuccessRatio(monitoring),
...getExecutionDurationPercentiles(monitoring),
};
if (!this.cancelled) {
@ -644,10 +637,60 @@ export class TaskRunner<
);
await this.updateRuleSavedObject(ruleId, namespace, {
executionStatus: ruleExecutionStatusToRaw(executionStatus),
monitoring: ruleMonitoring,
monitoring,
});
}
return { executionStatus, executionMetrics };
}
async run(): Promise<RuleTaskRunResult> {
const {
params: { alertId: ruleId, spaceId },
startedAt,
state: originalState,
schedule: taskSchedule,
} = this.taskInstance;
const runDate = new Date();
this.logger.debug(`executing rule ${this.ruleType.id}:${ruleId} at ${runDate.toISOString()}`);
if (startedAt) {
// Capture how long it took for the rule to start running after being claimed
this.timer.setDuration(TaskRunnerTimerSpan.StartTaskRun, startedAt);
}
let stateWithMetrics: Result<RuleTaskStateAndMetrics, Error>;
let monitoring: RuleMonitoring = getDefaultRuleMonitoring();
let schedule: Result<IntervalSchedule, Error>;
try {
const preparedResult = await this.timer.runWithTimer(
TaskRunnerTimerSpan.PrepareRule,
async () => this.prepareToRun()
);
monitoring = preparedResult.rule.monitoring ?? getDefaultRuleMonitoring();
stateWithMetrics = asOk(await this.runRule(preparedResult));
// fetch the rule again to ensure we return the correct schedule as it may have
// changed during the task execution
schedule = asOk((await preparedResult.rulesClient.get({ id: ruleId })).schedule);
} catch (err) {
stateWithMetrics = asErr(err);
schedule = asErr(err);
}
const { executionStatus, executionMetrics } = await this.timer.runWithTimer(
TaskRunnerTimerSpan.ProcessRuleRun,
async () =>
this.processRunResults({
runDate,
stateWithMetrics,
monitoring,
})
);
const transformRunStateToTaskState = (
runStateWithMetrics: RuleTaskStateAndMetrics
): RuleTaskState => {
@ -657,6 +700,17 @@ export class TaskRunner<
};
};
if (startedAt) {
// Capture how long it took for the rule to run after being claimed
this.timer.setDuration(TaskRunnerTimerSpan.TotalRunDuration, startedAt);
}
this.alertingEventLogger.done({
status: executionStatus,
metrics: executionMetrics,
timings: this.timer.toJson(),
});
return {
state: map<RuleTaskStateAndMetrics, ElasticsearchError, RuleTaskState>(
stateWithMetrics,
@ -698,7 +752,7 @@ export class TaskRunner<
return { interval: retryInterval };
}),
monitoring: ruleMonitoring,
monitoring,
};
}
@ -749,22 +803,3 @@ export class TaskRunner<
});
}
}
/**
* If an error is thrown, wrap it in an RuleTaskRunResult
* so that we can treat each field independantly
*/
async function errorAsRuleTaskRunResult(
future: Promise<Resultable<RuleRunResult, Error>>
): Promise<Resultable<RuleRunResult, Error>> {
try {
return await future;
} catch (e) {
return {
rulesClient: asErr(e),
stateWithMetrics: asErr(e),
schedule: asErr(e),
monitoring: asErr(e),
};
}
}

View file

@ -465,6 +465,15 @@ describe('Task Runner Cancel', () => {
lastExecutionDate: new Date('1970-01-01T00:00:00.000Z'),
status,
},
timings: {
claim_to_start_duration_ms: 0,
prepare_rule_duration_ms: 0,
process_alerts_duration_ms: 0,
process_rule_duration_ms: 0,
rule_type_run_duration_ms: 0,
total_run_duration_ms: 0,
trigger_actions_duration_ms: 0,
},
});
expect(alertingEventLogger.setExecutionSucceeded).toHaveBeenCalledWith(

View file

@ -0,0 +1,88 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Logger } from '@kbn/core/server';
import { loggingSystemMock } from '@kbn/core/server/mocks';
import { TaskRunnerTimer, TaskRunnerTimerSpan } from './task_runner_timer';
const mockLogger = loggingSystemMock.create().get() as jest.Mocked<Logger>;
describe('TaskRunnerTimer', () => {
let timer: TaskRunnerTimer;
beforeEach(() => {
jest.resetAllMocks();
timer = new TaskRunnerTimer({ logger: mockLogger });
});
describe('setDuration', () => {
beforeAll(() => {
jest.useFakeTimers('modern');
jest.setSystemTime(new Date('2020-03-09').getTime());
});
afterAll(() => {
jest.useRealTimers();
});
test('should calculate duration as now - given start date for given timer span', () => {
timer.setDuration(TaskRunnerTimerSpan.StartTaskRun, new Date('2020-03-06'));
expect(timer.toJson()).toEqual({
claim_to_start_duration_ms: 259200000,
prepare_rule_duration_ms: 0,
process_alerts_duration_ms: 0,
process_rule_duration_ms: 0,
rule_type_run_duration_ms: 0,
total_run_duration_ms: 0,
trigger_actions_duration_ms: 0,
});
});
test('should log warning and overwrite duration if called twice for same span', () => {
timer.setDuration(TaskRunnerTimerSpan.StartTaskRun, new Date('2020-03-06'));
timer.setDuration(TaskRunnerTimerSpan.StartTaskRun, new Date('2020-03-04'));
expect(mockLogger.warn).toHaveBeenCalledWith(
`Duration already exists for \"claim_to_start_duration_ms\" and will be overwritten`
);
expect(timer.toJson()).toEqual({
claim_to_start_duration_ms: 432000000,
prepare_rule_duration_ms: 0,
process_alerts_duration_ms: 0,
process_rule_duration_ms: 0,
rule_type_run_duration_ms: 0,
total_run_duration_ms: 0,
trigger_actions_duration_ms: 0,
});
});
});
describe('runWithTimer', () => {
test('should calculate time it takes to run callback function for a given timer span', async () => {
const result = await timer.runWithTimer(TaskRunnerTimerSpan.ProcessAlerts, async () => {
await new Promise((resolve) => setTimeout(resolve, 2000));
return 'done!';
});
expect(result).toEqual('done!');
expect(timer.toJson().process_alerts_duration_ms).toBeGreaterThan(2000);
});
test('should log warning and overwrite duration if called twice for same span', async () => {
await timer.runWithTimer(TaskRunnerTimerSpan.ProcessAlerts, async () => {
await new Promise((resolve) => setTimeout(resolve, 2000));
return 'done!';
});
await timer.runWithTimer(TaskRunnerTimerSpan.ProcessAlerts, async () => {
await new Promise((resolve) => setTimeout(resolve, 1000));
return 'done!';
});
expect(timer.toJson().process_alerts_duration_ms).toBeGreaterThan(1000);
expect(timer.toJson().process_alerts_duration_ms).toBeLessThan(2000);
});
});
});

View file

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Logger } from '@kbn/core/server';
export enum TaskRunnerTimerSpan {
StartTaskRun = 'claim_to_start_duration_ms',
TotalRunDuration = 'total_run_duration_ms',
PrepareRule = 'prepare_rule_duration_ms',
RuleTypeRun = 'rule_type_run_duration_ms',
ProcessAlerts = 'process_alerts_duration_ms',
TriggerActions = 'trigger_actions_duration_ms',
ProcessRuleRun = 'process_rule_duration_ms',
}
export type TaskRunnerTimings = Record<TaskRunnerTimerSpan, number>;
interface TaskRunnerTimerOpts {
logger: Logger;
}
export class TaskRunnerTimer {
private timings: Record<string, number> = {};
constructor(private readonly options: TaskRunnerTimerOpts) {}
/**
* Calcuate the time passed since a given start time and store this
* duration for the give name.
*/
public setDuration(name: TaskRunnerTimerSpan, start: Date) {
if (this.timings[name]) {
this.options.logger.warn(`Duration already exists for "${name}" and will be overwritten`);
}
// Calculate duration in millis from start until now and store
this.timings[name] = new Date().getTime() - start.getTime();
}
public async runWithTimer<T>(name: TaskRunnerTimerSpan, cb: () => Promise<T>): Promise<T> {
if (this.timings[name]) {
this.options.logger.warn(`Duration already exists for "${name}" and will be overwritten`);
}
const start = new Date();
const result = await cb();
const end = new Date();
this.timings[name] = end.getTime() - start.getTime();
return result;
}
public toJson(): TaskRunnerTimings {
return {
[TaskRunnerTimerSpan.StartTaskRun]: this.timings[TaskRunnerTimerSpan.StartTaskRun] ?? 0,
[TaskRunnerTimerSpan.TotalRunDuration]:
this.timings[TaskRunnerTimerSpan.TotalRunDuration] ?? 0,
[TaskRunnerTimerSpan.PrepareRule]: this.timings[TaskRunnerTimerSpan.PrepareRule] ?? 0,
[TaskRunnerTimerSpan.RuleTypeRun]: this.timings[TaskRunnerTimerSpan.RuleTypeRun] ?? 0,
[TaskRunnerTimerSpan.ProcessAlerts]: this.timings[TaskRunnerTimerSpan.ProcessAlerts] ?? 0,
[TaskRunnerTimerSpan.TriggerActions]: this.timings[TaskRunnerTimerSpan.TriggerActions] ?? 0,
[TaskRunnerTimerSpan.ProcessRuleRun]: this.timings[TaskRunnerTimerSpan.ProcessRuleRun] ?? 0,
};
}
}

View file

@ -19,6 +19,7 @@ import {
IntervalSchedule,
RuleMonitoring,
RuleTaskState,
SanitizedRule,
} from '../../common';
import { Alert } from '../alert';
import { NormalizedRuleType } from '../rule_type_registry';
@ -44,6 +45,14 @@ export type RuleRunResult = Pick<RuleTaskRunResult, 'monitoring' | 'schedule'> &
stateWithMetrics: RuleTaskStateAndMetrics;
};
export interface RunRuleParams<Params extends RuleTypeParams> {
fakeRequest: KibanaRequest;
rulesClient: RulesClientApi;
rule: SanitizedRule<Params>;
apiKey: RawRule['apiKey'];
validatedParams: Params;
}
export interface RuleTaskInstance extends ConcreteTaskInstance {
state: RuleTaskState;
}

View file

@ -326,6 +326,27 @@
},
"execution_gap_duration_s": {
"type": "long"
},
"rule_type_run_duration_ms": {
"type": "long"
},
"process_alerts_duration_ms": {
"type": "long"
},
"trigger_actions_duration_ms": {
"type": "long"
},
"process_rule_duration_ms": {
"type": "long"
},
"claim_to_start_duration_ms": {
"type": "long"
},
"prepare_rule_duration_ms": {
"type": "long"
},
"total_run_duration_ms": {
"type": "long"
}
}
}

View file

@ -143,6 +143,13 @@ export const EventSchema = schema.maybe(
es_search_duration_ms: ecsStringOrNumber(),
total_search_duration_ms: ecsStringOrNumber(),
execution_gap_duration_s: ecsStringOrNumber(),
rule_type_run_duration_ms: ecsStringOrNumber(),
process_alerts_duration_ms: ecsStringOrNumber(),
trigger_actions_duration_ms: ecsStringOrNumber(),
process_rule_duration_ms: ecsStringOrNumber(),
claim_to_start_duration_ms: ecsStringOrNumber(),
prepare_rule_duration_ms: ecsStringOrNumber(),
total_run_duration_ms: ecsStringOrNumber(),
})
),
})

View file

@ -109,6 +109,27 @@ exports.EcsCustomPropertyMappings = {
execution_gap_duration_s: {
type: 'long',
},
rule_type_run_duration_ms: {
type: 'long',
},
process_alerts_duration_ms: {
type: 'long',
},
trigger_actions_duration_ms: {
type: 'long',
},
process_rule_duration_ms: {
type: 'long',
},
claim_to_start_duration_ms: {
type: 'long',
},
prepare_rule_duration_ms: {
type: 'long',
},
total_run_duration_ms: {
type: 'long',
},
},
},
},

View file

@ -1332,6 +1332,28 @@ instanceStateValue: true
expect(event?.kibana?.alert?.rule?.execution?.metrics?.alert_counts?.new).to.be(1);
expect(event?.kibana?.alert?.rule?.execution?.metrics?.alert_counts?.recovered).to.be(0);
expect(
event?.kibana?.alert?.rule?.execution?.metrics?.claim_to_start_duration_ms
).to.be.greaterThan(0);
expect(event?.kibana?.alert?.rule?.execution?.metrics?.total_run_duration_ms).to.be.greaterThan(
0
);
expect(
event?.kibana?.alert?.rule?.execution?.metrics?.prepare_rule_duration_ms
).to.be.greaterThan(0);
expect(
event?.kibana?.alert?.rule?.execution?.metrics?.rule_type_run_duration_ms
).to.be.greaterThan(0);
expect(
event?.kibana?.alert?.rule?.execution?.metrics?.process_alerts_duration_ms
).to.be.greaterThan(0);
expect(
event?.kibana?.alert?.rule?.execution?.metrics?.trigger_actions_duration_ms
).to.be.greaterThan(0);
expect(
event?.kibana?.alert?.rule?.execution?.metrics?.process_rule_duration_ms
).to.be.greaterThan(0);
expect(event?.rule).to.eql({
id: alertId,
license: 'basic',

View file

@ -417,6 +417,28 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
// Total search duration should be greater since it includes any network latency
expect(totalSearchDuration! - esSearchDuration! > 0).to.be(true);
expect(
event?.kibana?.alert?.rule?.execution?.metrics?.claim_to_start_duration_ms
).to.be.greaterThan(0);
expect(
event?.kibana?.alert?.rule?.execution?.metrics?.total_run_duration_ms
).to.be.greaterThan(0);
expect(
event?.kibana?.alert?.rule?.execution?.metrics?.prepare_rule_duration_ms
).to.be.greaterThan(0);
expect(
event?.kibana?.alert?.rule?.execution?.metrics?.rule_type_run_duration_ms
).to.be.greaterThan(0);
expect(
event?.kibana?.alert?.rule?.execution?.metrics?.process_alerts_duration_ms! >= 0
).to.be(true);
expect(
event?.kibana?.alert?.rule?.execution?.metrics?.trigger_actions_duration_ms! >= 0
).to.be(true);
expect(
event?.kibana?.alert?.rule?.execution?.metrics?.process_rule_duration_ms
).to.be.greaterThan(0);
break;
// this will get triggered as we add new event actions
default: