[Response Ops][Alerting] Adding more granular apm spans to alerting task runner for better traceability (#186427)

Resolves https://github.com/elastic/kibana/issues/185873

## Summary

Added more granular APM spans during rule execution.

## To Verify
Add the following to your kibana config:

```
elastic.apm:
  active: true
  environment: 'ying-test-185873'
  transactionSampleRate: 1.0
  breakdownMetrics: true
  spanStackTraceMinDuration: 10ms
  # Disables Kibana RUM
  servicesOverrides.kibana-frontend.active: false
```

This will push APM transaction information to
`https://kibana-cloud-apm.elastic.dev/` where you can see what the new
spans look like. Create an alerting rule that will generate alerts and
add some actions and summary actions to the rule. Let it run and then
check out the transactions in the cloud APM cluster. Make sure the
correct environment is selected and then view the transaction for
`Execute Alerting Rule ${ruleName}`.

<img width="1380" alt="Screenshot 2024-06-20 at 10 09 30 AM"
src="519b06eb-0b5f-4550-9f32-c71559d61757">
<img width="1347" alt="Screenshot 2024-06-20 at 10 09 38 AM"
src="c3b7242e-9930-4bbe-b392-82bc3732c0a8">
This commit is contained in:
Ying Mao 2024-06-20 16:09:13 -04:00 committed by GitHub
parent ee03a1a6ba
commit d7e4cc44c9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 211 additions and 160 deletions

View file

@ -57,6 +57,7 @@ import {
} from './rule_action_helper';
import { RULE_SAVED_OBJECT_TYPE } from '../saved_objects';
import { ConnectorAdapter } from '../connector_adapters/types';
import { withAlertingSpan } from './lib';
enum Reasons {
MUTED = 'muted',
@ -353,7 +354,9 @@ export class ExecutionHandler<
for (const c of chunk(bulkActions, CHUNK_SIZE)) {
let enqueueResponse;
try {
enqueueResponse = await this.actionsClient!.bulkEnqueueExecution(c);
enqueueResponse = await withAlertingSpan('alerting:bulk-enqueue-actions', () =>
this.actionsClient!.bulkEnqueueExecution(c)
);
} catch (e) {
if (e.statusCode === 404) {
throw createTaskRunError(e, TaskErrorSource.USER);
@ -904,7 +907,9 @@ export class ExecutionHandler<
let alerts;
try {
alerts = await this.alertsClient.getSummarizedAlerts!(options);
alerts = await withAlertingSpan(`alerting:get-summarized-alerts-${action.uuid}`, () =>
this.alertsClient.getSummarizedAlerts!(options)
);
} catch (e) {
throw createTaskRunError(e, TaskErrorSource.FRAMEWORK);
}

View file

@ -25,6 +25,7 @@ import {
import { RuleMonitoringService } from '../monitoring/rule_monitoring_service';
import { RuleResultService } from '../monitoring/rule_result_service';
import { PublicRuleMonitoringService, PublicRuleResultService } from '../types';
import { withAlertingSpan } from './lib';
import { TaskRunnerContext } from './types';
interface GetExecutorServicesOpts {
@ -65,7 +66,9 @@ export const getExecutorServices = async (opts: GetExecutorServicesOpts) => {
scopedClusterClient,
});
const searchSourceClient = await context.data.search.searchSource.asScoped(fakeRequest);
const searchSourceClient = await withAlertingSpan('alerting:get-search-source-client', () =>
context.data.search.searchSource.asScoped(fakeRequest)
);
const wrappedSearchSourceClient = wrapSearchSourceClient({
...wrappedClientOptions,
searchSourceClient,
@ -75,9 +78,11 @@ export const getExecutorServices = async (opts: GetExecutorServicesOpts) => {
includedHiddenTypes: [RULE_SAVED_OBJECT_TYPE, 'action'],
});
const dataViews = await context.dataViews.dataViewsServiceFactory(
savedObjectsClient,
scopedClusterClient.asInternalUser
const dataViews = await await withAlertingSpan('alerting:get-data-views-factory', () =>
context.dataViews.dataViewsServiceFactory(
savedObjectsClient,
scopedClusterClient.asInternalUser
)
);
const uiSettingsClient = context.uiSettings.asScopedToClient(savedObjectsClient);

View file

@ -7,3 +7,4 @@
export { partiallyUpdateAdHocRun } from './partially_update_ad_hoc_run';
export { processRunResults } from './process_run_result';
export { withAlertingSpan } from './with_alerting_span';

View file

@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { withSpan } from '@kbn/apm-utils';
export async function withAlertingSpan<T>(name: string, cb: () => Promise<T>): Promise<T> {
return withSpan({ name, type: 'rule run', labels: { plugin: 'alerting' } }, cb);
}

View file

@ -30,6 +30,7 @@ import {
import { ExecutorServices } from './get_executor_services';
import { TaskRunnerTimer, TaskRunnerTimerSpan } from './task_runner_timer';
import { RuleRunnerErrorStackTraceLog, RuleTypeRunnerContext, TaskRunnerContext } from './types';
import { withAlertingSpan } from './lib';
interface ConstructorOpts<
Params extends RuleTypeParams,
@ -211,67 +212,69 @@ export class RuleTypeRunner<
context.namespace ?? DEFAULT_NAMESPACE_STRING
}] namespace`,
};
executorResult = await this.options.context.executionContext.withContext(ctx, () =>
ruleType.executor({
executionId,
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
dataViews: executorServices.dataViews,
ruleMonitoringService: executorServices.ruleMonitoringService,
ruleResultService: executorServices.ruleResultService,
savedObjectsClient: executorServices.savedObjectsClient,
scopedClusterClient: executorServices.wrappedScopedClusterClient.client(),
searchSourceClient: executorServices.wrappedSearchSourceClient.searchSourceClient,
share: this.options.context.share,
shouldStopExecution: () => this.cancelled,
shouldWriteAlerts: () =>
this.shouldLogAndScheduleActionsForAlerts(ruleType.cancelAlertsOnRuleTimeout),
uiSettingsClient: executorServices.uiSettingsClient,
},
params: validatedParams,
state: ruleTypeState as RuleState,
startedAtOverridden,
startedAt,
previousStartedAt: previousStartedAt ? new Date(previousStartedAt) : null,
spaceId: context.spaceId,
namespace: context.namespace,
rule: {
id: context.ruleId,
name,
tags,
consumer,
producer: ruleType.producer,
revision,
ruleTypeId,
ruleTypeName: ruleType.name,
enabled,
schedule,
actions,
createdBy,
updatedBy,
createdAt,
updatedAt,
throttle,
notifyWhen,
muteAll,
snoozeSchedule,
alertDelay,
},
logger: this.options.logger,
flappingSettings: context.flappingSettings ?? DEFAULT_FLAPPING_SETTINGS,
// passed in so the rule registry knows about maintenance windows
...(maintenanceWindowsWithoutScopedQueryIds.length
? { maintenanceWindowIds: maintenanceWindowsWithoutScopedQueryIds }
: {}),
getTimeRange: (timeWindow) =>
getTimeRange({
logger: this.options.logger,
window: timeWindow,
...(context.queryDelaySec ? { queryDelay: context.queryDelaySec } : {}),
...(startedAtOverridden ? { forceNow: startedAt } : {}),
}),
})
executorResult = await withAlertingSpan('rule-type-executor', () =>
this.options.context.executionContext.withContext(ctx, () =>
ruleType.executor({
executionId,
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
dataViews: executorServices.dataViews,
ruleMonitoringService: executorServices.ruleMonitoringService,
ruleResultService: executorServices.ruleResultService,
savedObjectsClient: executorServices.savedObjectsClient,
scopedClusterClient: executorServices.wrappedScopedClusterClient.client(),
searchSourceClient: executorServices.wrappedSearchSourceClient.searchSourceClient,
share: this.options.context.share,
shouldStopExecution: () => this.cancelled,
shouldWriteAlerts: () =>
this.shouldLogAndScheduleActionsForAlerts(ruleType.cancelAlertsOnRuleTimeout),
uiSettingsClient: executorServices.uiSettingsClient,
},
params: validatedParams,
state: ruleTypeState as RuleState,
startedAtOverridden,
startedAt,
previousStartedAt: previousStartedAt ? new Date(previousStartedAt) : null,
spaceId: context.spaceId,
namespace: context.namespace,
rule: {
id: context.ruleId,
name,
tags,
consumer,
producer: ruleType.producer,
revision,
ruleTypeId,
ruleTypeName: ruleType.name,
enabled,
schedule,
actions,
createdBy,
updatedBy,
createdAt,
updatedAt,
throttle,
notifyWhen,
muteAll,
snoozeSchedule,
alertDelay,
},
logger: this.options.logger,
flappingSettings: context.flappingSettings ?? DEFAULT_FLAPPING_SETTINGS,
// passed in so the rule registry knows about maintenance windows
...(maintenanceWindowsWithoutScopedQueryIds.length
? { maintenanceWindowIds: maintenanceWindowsWithoutScopedQueryIds }
: {}),
getTimeRange: (timeWindow) =>
getTimeRange({
logger: this.options.logger,
window: timeWindow,
...(context.queryDelaySec ? { queryDelay: context.queryDelaySec } : {}),
...(startedAtOverridden ? { forceNow: startedAt } : {}),
}),
})
)
);
// Rule type execution has successfully completed
// Check that the rule type either never requested the max alerts limit
@ -318,31 +321,35 @@ export class RuleTypeRunner<
return { state: undefined, error, stackTrace };
}
await this.options.timer.runWithTimer(TaskRunnerTimerSpan.ProcessAlerts, async () => {
alertsClient.processAlerts({
flappingSettings: context.flappingSettings ?? DEFAULT_FLAPPING_SETTINGS,
maintenanceWindowIds: maintenanceWindowsWithoutScopedQueryIds,
alertDelay: alertDelay?.active ?? 0,
ruleRunMetricsStore: context.ruleRunMetricsStore,
});
});
await withAlertingSpan('alerting:process-alerts', () =>
this.options.timer.runWithTimer(TaskRunnerTimerSpan.ProcessAlerts, async () => {
alertsClient.processAlerts({
flappingSettings: context.flappingSettings ?? DEFAULT_FLAPPING_SETTINGS,
maintenanceWindowIds: maintenanceWindowsWithoutScopedQueryIds,
alertDelay: alertDelay?.active ?? 0,
ruleRunMetricsStore: context.ruleRunMetricsStore,
});
})
);
await this.options.timer.runWithTimer(TaskRunnerTimerSpan.PersistAlerts, async () => {
const updateAlertsMaintenanceWindowResult = await alertsClient.persistAlerts(
maintenanceWindows
);
// Set the event log MW ids again, this time including the ids that matched alerts with
// scoped query
if (
updateAlertsMaintenanceWindowResult?.maintenanceWindowIds &&
updateAlertsMaintenanceWindowResult?.maintenanceWindowIds.length > 0
) {
context.alertingEventLogger.setMaintenanceWindowIds(
updateAlertsMaintenanceWindowResult.maintenanceWindowIds
await withAlertingSpan('alerting:index-alerts-as-data', () =>
this.options.timer.runWithTimer(TaskRunnerTimerSpan.PersistAlerts, async () => {
const updateAlertsMaintenanceWindowResult = await alertsClient.persistAlerts(
maintenanceWindows
);
}
});
// Set the event log MW ids again, this time including the ids that matched alerts with
// scoped query
if (
updateAlertsMaintenanceWindowResult?.maintenanceWindowIds &&
updateAlertsMaintenanceWindowResult?.maintenanceWindowIds.length > 0
) {
context.alertingEventLogger.setMaintenanceWindowIds(
updateAlertsMaintenanceWindowResult.maintenanceWindowIds
);
}
})
);
alertsClient.logAlerts({
eventLogger: context.alertingEventLogger,

View file

@ -68,7 +68,7 @@ import { MaintenanceWindow } from '../application/maintenance_window/types';
import { filterMaintenanceWindowsIds, getMaintenanceWindows } from './get_maintenance_windows';
import { RuleTypeRunner } from './rule_type_runner';
import { initializeAlertsClient } from '../alerts_client';
import { processRunResults } from './lib';
import { withAlertingSpan, processRunResults } from './lib';
const FALLBACK_RETRY_INTERVAL = '5m';
const CONNECTIVITY_RETRY_INTERVAL = '5m';
@ -294,11 +294,16 @@ export class TaskRunner<
const rulesSettingsClient = this.context.getRulesSettingsClientWithRequest(fakeRequest);
const ruleRunMetricsStore = new RuleRunMetricsStore();
const ruleLabel = `${this.ruleType.id}:${ruleId}: '${rule.name}'`;
const queryDelay = await rulesSettingsClient.queryDelay().get();
const queryDelay = await withAlertingSpan('alerting:get-query-delay-settings', () =>
rulesSettingsClient.queryDelay().get()
);
const flappingSettings = await withAlertingSpan('alerting:get-flapping-settings', () =>
rulesSettingsClient.flapping().get()
);
const ruleTypeRunnerContext = {
alertingEventLogger: this.alertingEventLogger,
flappingSettings: await rulesSettingsClient.flapping().get(),
flappingSettings,
namespace: this.context.spaceIdToNamespace(spaceId),
queryDelaySec: queryDelay.delay,
ruleId,
@ -306,47 +311,51 @@ export class TaskRunner<
ruleRunMetricsStore,
spaceId,
};
const alertsClient = await initializeAlertsClient<
Params,
AlertData,
AlertState,
Context,
ActionGroupIds,
RecoveryActionGroupId
>({
alertsService: this.context.alertsService,
context: ruleTypeRunnerContext,
executionId: this.executionId,
logger: this.logger,
maxAlerts: this.context.maxAlerts,
rule: {
id: rule.id,
name: rule.name,
tags: rule.tags,
consumer: rule.consumer,
revision: rule.revision,
alertDelay: rule.alertDelay,
params: rule.params,
},
ruleType: this.ruleType as UntypedNormalizedRuleType,
startedAt: this.taskInstance.startedAt,
taskInstance: this.taskInstance,
});
const executorServices = await getExecutorServices({
context: this.context,
fakeRequest,
abortController: this.searchAbortController,
logger: this.logger,
ruleMonitoringService: this.ruleMonitoring,
ruleResultService: this.ruleResult,
ruleData: {
name: rule.name,
alertTypeId: rule.alertTypeId,
id: rule.id,
spaceId,
},
ruleTaskTimeout: this.ruleType.ruleTaskTimeout,
});
const alertsClient = await withAlertingSpan('alerting:initialize-alerts-client', () =>
initializeAlertsClient<
Params,
AlertData,
AlertState,
Context,
ActionGroupIds,
RecoveryActionGroupId
>({
alertsService: this.context.alertsService,
context: ruleTypeRunnerContext,
executionId: this.executionId,
logger: this.logger,
maxAlerts: this.context.maxAlerts,
rule: {
id: rule.id,
name: rule.name,
tags: rule.tags,
consumer: rule.consumer,
revision: rule.revision,
alertDelay: rule.alertDelay,
params: rule.params,
},
ruleType: this.ruleType as UntypedNormalizedRuleType,
startedAt: this.taskInstance.startedAt,
taskInstance: this.taskInstance,
})
);
const executorServices = await withAlertingSpan('alerting:get-executor-services', () =>
getExecutorServices({
context: this.context,
fakeRequest,
abortController: this.searchAbortController,
logger: this.logger,
ruleMonitoringService: this.ruleMonitoring,
ruleResultService: this.ruleResult,
ruleData: {
name: rule.name,
alertTypeId: rule.alertTypeId,
id: rule.id,
spaceId,
},
ruleTaskTimeout: this.ruleType.ruleTaskTimeout,
})
);
const {
state: updatedRuleTypeState,
@ -391,21 +400,23 @@ export class TaskRunner<
let executionHandlerRunResult: RunResult = { throttledSummaryActions: {} };
await this.timer.runWithTimer(TaskRunnerTimerSpan.TriggerActions, async () => {
if (isRuleSnoozed(rule)) {
this.logger.debug(`no scheduling of actions for rule ${ruleLabel}: rule is snoozed.`);
} else if (!this.shouldLogAndScheduleActionsForAlerts()) {
this.logger.debug(
`no scheduling of actions for rule ${ruleLabel}: rule execution has been cancelled.`
);
this.countUsageOfActionExecutionAfterRuleCancellation();
} else {
executionHandlerRunResult = await executionHandler.run({
...alertsClient.getProcessedAlerts('activeCurrent'),
...alertsClient.getProcessedAlerts('recoveredCurrent'),
});
}
});
await withAlertingSpan('alerting:schedule-actions', () =>
this.timer.runWithTimer(TaskRunnerTimerSpan.TriggerActions, async () => {
if (isRuleSnoozed(rule)) {
this.logger.debug(`no scheduling of actions for rule ${ruleLabel}: rule is snoozed.`);
} else if (!this.shouldLogAndScheduleActionsForAlerts()) {
this.logger.debug(
`no scheduling of actions for rule ${ruleLabel}: rule execution has been cancelled.`
);
this.countUsageOfActionExecutionAfterRuleCancellation();
} else {
executionHandlerRunResult = await executionHandler.run({
...alertsClient.getProcessedAlerts('activeCurrent'),
...alertsClient.getProcessedAlerts('recoveredCurrent'),
});
}
})
);
let alertsToReturn: Record<string, RawAlertInstance> = {};
let recoveredAlertsToReturn: Record<string, RawAlertInstance> = {};
@ -481,6 +492,7 @@ export class TaskRunner<
apm.currentTransaction.addLabels({
alerting_rule_space_id: spaceId,
alerting_rule_id: ruleId,
plugins: 'alerting',
});
}
@ -495,7 +507,9 @@ export class TaskRunner<
this.timer.setDuration(TaskRunnerTimerSpan.StartTaskRun, startedAt);
}
const ruleData = await getDecryptedRule(this.context, ruleId, spaceId);
const ruleData = await withAlertingSpan('alerting:get-decrypted-rule', () =>
getDecryptedRule(this.context, ruleId, spaceId)
);
const runRuleParams = validateRuleAndCreateFakeRequest({
ruleData,
@ -520,14 +534,16 @@ export class TaskRunner<
this.ruleMonitoring.setMonitoring(runRuleParams.rule.monitoring);
// Load the maintenance windows
this.maintenanceWindows = await getMaintenanceWindows({
context: this.context,
fakeRequest: runRuleParams.fakeRequest,
logger: this.logger,
ruleTypeId: this.ruleType.id,
ruleId,
ruleTypeCategory: this.ruleType.category,
});
this.maintenanceWindows = await withAlertingSpan('alerting:load-maintenance-windows', () =>
getMaintenanceWindows({
context: this.context,
fakeRequest: runRuleParams.fakeRequest,
logger: this.logger,
ruleTypeId: this.ruleType.id,
ruleId,
ruleTypeCategory: this.ruleType.category,
})
);
// Set the event log MW Id field the first time with MWs without scoped queries
this.maintenanceWindowsWithoutScopedQueryIds = filterMaintenanceWindowsIds({
@ -655,7 +671,10 @@ export class TaskRunner<
let schedule: Result<IntervalSchedule, Error>;
try {
const validatedRuleData = await this.prepareToRun();
stateWithMetrics = asOk(await this.runRule(validatedRuleData));
stateWithMetrics = asOk(
await withAlertingSpan('alerting:run', () => this.runRule(validatedRuleData))
);
// fetch the rule again to ensure we return the correct schedule as it may have
// changed during the task execution
@ -666,7 +685,9 @@ export class TaskRunner<
schedule = asErr(err);
}
await this.processRunResults({ schedule, stateWithMetrics });
await withAlertingSpan('alerting:process-run-results-and-update-rule', () =>
this.processRunResults({ schedule, stateWithMetrics })
);
const transformRunStateToTaskState = (
runStateWithMetrics: RuleTaskStateAndMetrics