Remove skipping task logic from Alerting plugin (#179065)

Towards: https://github.com/elastic/kibana/issues/176585

This PR is a follow-on of https://github.com/elastic/kibana/pull/177244
and removes the task skipping logic from the Alerting plugin.

## To verify

Rules should run as expected and `prepare_to_run_duration_ms` should not
be in the event-log metrics.
Below query can be used to check metrics.

```
GET /.kibana-event-log-*/_search
{
  "query": {
    "bool" : {
      "must" : [
        { "term" : { "event.action" : "execute" } },
        { "term" : { "event.provider" : "alerting" } }
      ]
    }
  },
  "size": 100
}
```
This commit is contained in:
Ersin Erdal 2024-03-22 16:41:25 +01:00 committed by GitHub
parent c0ad45f527
commit 7b2f0e2bab
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 55 additions and 215 deletions

View file

@ -777,7 +777,6 @@ describe('AlertingEventLogger', () => {
[TaskRunnerTimerSpan.StartTaskRun]: 10,
[TaskRunnerTimerSpan.TotalRunDuration]: 20,
[TaskRunnerTimerSpan.PrepareRule]: 30,
[TaskRunnerTimerSpan.PrepareToRun]: 35,
[TaskRunnerTimerSpan.RuleTypeRun]: 40,
[TaskRunnerTimerSpan.ProcessAlerts]: 50,
[TaskRunnerTimerSpan.PersistAlerts]: 60,
@ -801,7 +800,6 @@ describe('AlertingEventLogger', () => {
claim_to_start_duration_ms: 10,
total_run_duration_ms: 20,
prepare_rule_duration_ms: 30,
prepare_to_run_duration_ms: 35,
rule_type_run_duration_ms: 40,
process_alerts_duration_ms: 50,
persist_alerts_duration_ms: 60,
@ -840,7 +838,6 @@ describe('AlertingEventLogger', () => {
[TaskRunnerTimerSpan.StartTaskRun]: 10,
[TaskRunnerTimerSpan.TotalRunDuration]: 20,
[TaskRunnerTimerSpan.PrepareRule]: 30,
[TaskRunnerTimerSpan.PrepareToRun]: 35,
[TaskRunnerTimerSpan.RuleTypeRun]: 40,
[TaskRunnerTimerSpan.ProcessAlerts]: 50,
[TaskRunnerTimerSpan.PersistAlerts]: 60,
@ -875,7 +872,6 @@ describe('AlertingEventLogger', () => {
claim_to_start_duration_ms: 10,
total_run_duration_ms: 20,
prepare_rule_duration_ms: 30,
prepare_to_run_duration_ms: 35,
rule_type_run_duration_ms: 40,
process_alerts_duration_ms: 50,
persist_alerts_duration_ms: 60,

View file

@ -39,7 +39,6 @@ import { AlertingRulesConfig } from '.';
import { AlertsService } from './alerts_service/alerts_service';
import { getRuleTypeIdValidLegacyConsumers } from './rule_type_registry_deprecated_consumers';
import { AlertingConfig } from './config';
import { rawRuleSchemaV1 } from './saved_objects/schemas/raw_rule';
export interface ConstructorOptions {
config: AlertingConfig;
@ -315,7 +314,6 @@ export class RuleTypeRegistry {
spaceId: schema.string(),
consumer: schema.maybe(schema.string()),
}),
indirectParamsSchema: rawRuleSchemaV1,
},
});

View file

@ -19,7 +19,7 @@ import { ruleTypeRegistryMock } from '../rule_type_registry.mock';
import { rulesClientMock } from '../rules_client.mock';
import { Rule } from '../types';
import { MONITORING_HISTORY_LIMIT, RuleExecutionStatusErrorReasons } from '../../common';
import { ErrorWithReason, getReasonFromError } from '../lib/error_with_reason';
import { getReasonFromError } from '../lib/error_with_reason';
import { mockedRawRuleSO, mockedRule } from './fixtures';
import { RULE_SAVED_OBJECT_TYPE } from '../saved_objects';
import { getErrorSource, TaskErrorSource } from '@kbn/task-manager-plugin/server/task_running';
@ -48,26 +48,16 @@ describe('rule_loader', () => {
bar: schema.boolean(),
});
const getDefaultValidateRuleParams = ({
error,
enabled: ruleEnabled = true,
}: {
error?: ErrorWithReason;
enabled?: boolean;
}) => ({
const getDefaultValidateRuleParams = (ruleEnabled: boolean = true) => ({
paramValidator,
ruleId,
spaceId,
ruleTypeRegistry,
ruleData: error
? { error }
: {
data: {
indirectParams: { ...mockedRawRuleSO.attributes, enabled: ruleEnabled },
version: '1',
references: [],
},
},
ruleData: {
rawRule: { ...mockedRawRuleSO.attributes, enabled: ruleEnabled },
version: '1',
references: [],
},
});
beforeEach(() => {
@ -92,7 +82,7 @@ describe('rule_loader', () => {
describe('succeeds', () => {
test('validates and returns the results', () => {
const result = validateRuleAndCreateFakeRequest({
...getDefaultValidateRuleParams({}),
...getDefaultValidateRuleParams(),
context,
});
@ -107,27 +97,11 @@ describe('rule_loader', () => {
});
});
test('throws when there is decrypt attributes error', () => {
let outcome = 'success';
try {
validateRuleAndCreateFakeRequest({
...getDefaultValidateRuleParams({
error: new ErrorWithReason(RuleExecutionStatusErrorReasons.Decrypt, new Error('test')),
}),
context,
});
} catch (err) {
outcome = 'failure';
expect(getReasonFromError(err)).toBe(RuleExecutionStatusErrorReasons.Decrypt);
}
expect(outcome).toBe('failure');
});
test('throws when rule is not enabled', async () => {
let outcome = 'success';
try {
validateRuleAndCreateFakeRequest({
...getDefaultValidateRuleParams({ enabled: false }),
...getDefaultValidateRuleParams(false),
context,
});
} catch (err) {
@ -146,7 +120,7 @@ describe('rule_loader', () => {
let outcome = 'success';
try {
validateRuleAndCreateFakeRequest({
...getDefaultValidateRuleParams({}),
...getDefaultValidateRuleParams(),
context,
});
} catch (err) {
@ -164,7 +138,7 @@ describe('rule_loader', () => {
let outcome = 'success';
try {
validateRuleAndCreateFakeRequest({
...getDefaultValidateRuleParams({}),
...getDefaultValidateRuleParams(),
context,
});
} catch (err) {
@ -182,7 +156,7 @@ describe('rule_loader', () => {
contextMock.spaceIdToNamespace.mockReturnValue(undefined);
const result = await getDecryptedRule(context, ruleId, 'default');
expect(result.indirectParams).toEqual({
expect(result.rawRule).toEqual({
...mockedRawRuleSO.attributes,
apiKey,
enabled,
@ -201,7 +175,7 @@ describe('rule_loader', () => {
const result = await getDecryptedRule(context, ruleId, spaceId);
expect(contextMock.spaceIdToNamespace.mock.calls[0]).toEqual([spaceId]);
expect(result.indirectParams).toEqual({
expect(result.rawRule).toEqual({
...mockedRawRuleSO.attributes,
apiKey,
enabled,

View file

@ -14,10 +14,6 @@ import {
SavedObjectReference,
SavedObjectsErrorHelpers,
} from '@kbn/core/server';
import {
LoadedIndirectParams,
LoadIndirectParamsResult,
} from '@kbn/task-manager-plugin/server/task';
import { createTaskRunError, TaskErrorSource } from '@kbn/task-manager-plugin/server';
import { RunRuleParams, TaskRunnerContext } from './types';
import { ErrorWithReason, validateRuleTypeParams } from '../lib';
@ -30,18 +26,16 @@ import {
import { MONITORING_HISTORY_LIMIT, RuleTypeParams } from '../../common';
import { RULE_SAVED_OBJECT_TYPE } from '../saved_objects';
export interface RuleData extends LoadedIndirectParams<RawRule> {
indirectParams: RawRule;
interface RuleData {
rawRule: RawRule;
version: string | undefined;
references: SavedObjectReference[];
}
export type RuleDataResult<T extends LoadedIndirectParams> = LoadIndirectParamsResult<T>;
interface ValidateRuleAndCreateFakeRequestParams<Params extends RuleTypeParams> {
context: TaskRunnerContext;
paramValidator?: RuleTypeParamsValidator<Params>;
ruleData: RuleDataResult<RuleData>;
ruleData: RuleData;
ruleId: string;
ruleTypeRegistry: RuleTypeRegistry;
spaceId: string;
@ -56,23 +50,16 @@ interface ValidateRuleAndCreateFakeRequestParams<Params extends RuleTypeParams>
export function validateRuleAndCreateFakeRequest<Params extends RuleTypeParams>(
params: ValidateRuleAndCreateFakeRequestParams<Params>
): RunRuleParams<Params> {
// If there was a prior error loading the decrypted rule SO, exit early
if (params.ruleData.error) {
throw params.ruleData.error;
}
const {
context,
paramValidator,
ruleData: {
data: { indirectParams, references, version },
},
ruleData: { rawRule, references, version },
ruleId,
ruleTypeRegistry,
spaceId,
} = params;
const { enabled, apiKey, alertTypeId: ruleTypeId } = indirectParams;
const { enabled, apiKey, alertTypeId: ruleTypeId } = rawRule;
if (!enabled) {
throw createTaskRunError(
@ -89,7 +76,7 @@ export function validateRuleAndCreateFakeRequest<Params extends RuleTypeParams>(
const rule = rulesClient.getAlertFromRaw({
id: ruleId,
ruleTypeId,
rawRule: indirectParams as RawRule,
rawRule,
references,
includeLegacyId: false,
omitGeneratedValues: false,
@ -150,15 +137,16 @@ export async function getDecryptedRule(
{ namespace }
);
} catch (e) {
const error = new ErrorWithReason(RuleExecutionStatusErrorReasons.Decrypt, e);
if (SavedObjectsErrorHelpers.isNotFoundError(e)) {
throw createTaskRunError(e, TaskErrorSource.USER);
throw createTaskRunError(error, TaskErrorSource.USER);
}
throw createTaskRunError(e, TaskErrorSource.FRAMEWORK);
throw createTaskRunError(error, TaskErrorSource.FRAMEWORK);
}
return {
version: rawRule.version,
indirectParams: rawRule.attributes,
rawRule: rawRule.attributes,
references: rawRule.references,
};
}

View file

@ -3285,52 +3285,6 @@ describe('Task Runner', () => {
expect(mockUsageCounter.incrementCounter).not.toHaveBeenCalled();
});
test('loadIndirectParams Fetches the ruleData and returns the indirectParams', async () => {
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue(mockedRawRuleSO);
const taskRunner = new TaskRunner({
ruleType,
taskInstance: {
...mockedTaskInstance,
state: {
...mockedTaskInstance.state,
previousStartedAt: new Date(Date.now() - 5 * 60 * 1000).toISOString(),
},
},
context: taskRunnerFactoryInitializerParams,
inMemoryMetrics,
});
const result = await taskRunner.loadIndirectParams();
expect(encryptedSavedObjectsClient.getDecryptedAsInternalUser).toHaveBeenCalledTimes(1);
expect(result).toEqual({
data: expect.objectContaining({ indirectParams: mockedRawRuleSO.attributes }),
});
});
test('loadIndirectParams return error when cannot fetch the ruleData', async () => {
const error = new Error('test');
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockRejectedValueOnce(error);
const taskRunner = new TaskRunner({
ruleType,
taskInstance: {
...mockedTaskInstance,
state: {
...mockedTaskInstance.state,
previousStartedAt: new Date(Date.now() - 5 * 60 * 1000).toISOString(),
},
},
context: taskRunnerFactoryInitializerParams,
inMemoryMetrics,
});
const result = await taskRunner.loadIndirectParams();
expect(encryptedSavedObjectsClient.getDecryptedAsInternalUser).toHaveBeenCalledTimes(1);
expect(result).toEqual({ error });
expect(getErrorSource(result.error as Error)).toBe(TaskErrorSource.FRAMEWORK);
});
function testAlertingEventLogCalls({
ruleContext = alertingEventLoggerInitializer,
activeAlerts = 0,
@ -3398,7 +3352,6 @@ describe('Task Runner', () => {
claim_to_start_duration_ms: 0,
persist_alerts_duration_ms: 0,
prepare_rule_duration_ms: 0,
prepare_to_run_duration_ms: 0,
process_alerts_duration_ms: 0,
process_rule_duration_ms: 0,
rule_type_run_duration_ms: 0,
@ -3434,7 +3387,6 @@ describe('Task Runner', () => {
claim_to_start_duration_ms: 0,
persist_alerts_duration_ms: 0,
prepare_rule_duration_ms: 0,
prepare_to_run_duration_ms: 0,
process_alerts_duration_ms: 0,
process_rule_duration_ms: 0,
rule_type_run_duration_ms: 0,
@ -3468,7 +3420,6 @@ describe('Task Runner', () => {
claim_to_start_duration_ms: 0,
persist_alerts_duration_ms: 0,
prepare_rule_duration_ms: 0,
prepare_to_run_duration_ms: 0,
process_alerts_duration_ms: 0,
process_rule_duration_ms: 0,
rule_type_run_duration_ms: 0,

View file

@ -22,7 +22,6 @@ import { TaskRunnerContext } from './types';
import { getExecutorServices } from './get_executor_services';
import {
ElasticsearchError,
ErrorWithReason,
executionStatusFromError,
executionStatusFromState,
getNextRun,
@ -66,12 +65,7 @@ import {
import { IExecutionStatusAndMetrics } from '../lib/rule_execution_status';
import { RuleRunMetricsStore } from '../lib/rule_run_metrics_store';
import { AlertingEventLogger } from '../lib/alerting_event_logger/alerting_event_logger';
import {
getDecryptedRule,
RuleData,
RuleDataResult,
validateRuleAndCreateFakeRequest,
} from './rule_loader';
import { getDecryptedRule, validateRuleAndCreateFakeRequest } from './rule_loader';
import { TaskRunnerTimer, TaskRunnerTimerSpan } from './task_runner_timer';
import { RuleMonitoringService } from '../monitoring/rule_monitoring_service';
import { ILastRun, lastRunFromState, lastRunToRaw } from '../lib/last_run_status';
@ -151,7 +145,6 @@ export class TaskRunner<
private ruleMonitoring: RuleMonitoringService;
private ruleRunning: RunningHandler;
private ruleResult: RuleResultService;
private ruleData?: RuleDataResult<RuleData>;
private maintenanceWindows: MaintenanceWindow[] = [];
private maintenanceWindowsWithoutScopedQueryIds: string[] = [];
private ruleTypeRunner: RuleTypeRunner<
@ -442,12 +435,37 @@ export class TaskRunner<
* - clear expired snoozes
*/
private async prepareToRun(): Promise<RunRuleParams<Params>> {
return await this.timer.runWithTimer(TaskRunnerTimerSpan.PrepareToRun, async () => {
return await this.timer.runWithTimer(TaskRunnerTimerSpan.PrepareRule, async () => {
const {
params: { alertId: ruleId, spaceId },
params: { alertId: ruleId, spaceId, consumer },
startedAt,
} = this.taskInstance;
// Initially use consumer as stored inside the task instance
// This allows us to populate a consumer value for event log
// `execute-start` events (which are indexed before the rule SO is read)
// and in the event of decryption errors (where we cannot read the rule SO)
// Because "consumer" is set when a rule is created, this value should be static
// for the life of a rule but there may be edge cases where migrations cause
// the consumer values to become out of sync.
if (consumer) {
this.ruleConsumer = consumer;
}
// Start the event logger so that something is logged in the
// event that rule SO decryption fails.
const namespace = this.context.spaceIdToNamespace(spaceId);
this.alertingEventLogger.initialize({
ruleId,
ruleType: this.ruleType as UntypedNormalizedRuleType,
consumer: this.ruleConsumer!,
spaceId,
executionId: this.executionId,
taskScheduledAt: this.taskInstance.scheduledAt,
...(namespace ? { namespace } : {}),
});
this.alertingEventLogger.start(this.runDate);
if (apm.currentTransaction) {
apm.currentTransaction.name = `Execute Alerting Rule`;
apm.currentTransaction.addLabels({
@ -466,13 +484,10 @@ export class TaskRunner<
this.timer.setDuration(TaskRunnerTimerSpan.StartTaskRun, startedAt);
}
// Load rule SO (indirect task params) if necessary
if (!this.ruleData) {
this.ruleData = await this.loadIndirectParams();
}
const ruleData = await getDecryptedRule(this.context, ruleId, spaceId);
const runRuleParams = validateRuleAndCreateFakeRequest({
ruleData: this.ruleData,
ruleData,
paramValidator: this.ruleType.validate.params,
ruleId,
spaceId,
@ -651,53 +666,8 @@ export class TaskRunner<
});
}
async loadIndirectParams(): Promise<RuleDataResult<RuleData>> {
this.runDate = new Date();
return await this.timer.runWithTimer(TaskRunnerTimerSpan.PrepareRule, async () => {
try {
const {
params: { alertId: ruleId, spaceId, consumer },
} = this.taskInstance;
// Initially use consumer as stored inside the task instance
// This allows us to populate a consumer value for event log
// `execute-start` events (which are indexed before the rule SO is read)
// and in the event of decryption errors (where we cannot read the rule SO)
// Because "consumer" is set when a rule is created, this value should be static
// for the life of a rule but there may be edge cases where migrations cause
// the consumer values to become out of sync.
if (consumer) {
this.ruleConsumer = consumer;
}
// Start the event logger so that something is logged in the
// event that rule SO decryption fails.
const namespace = this.context.spaceIdToNamespace(spaceId);
this.alertingEventLogger.initialize({
ruleId,
ruleType: this.ruleType as UntypedNormalizedRuleType,
consumer: this.ruleConsumer!,
spaceId,
executionId: this.executionId,
taskScheduledAt: this.taskInstance.scheduledAt,
...(namespace ? { namespace } : {}),
});
this.alertingEventLogger.start(this.runDate);
const data = await getDecryptedRule(this.context, ruleId, spaceId);
this.ruleData = { data };
} catch (err) {
const error = createTaskRunError(
new ErrorWithReason(RuleExecutionStatusErrorReasons.Decrypt, err),
getErrorSource(err)
);
this.ruleData = { error };
}
return this.ruleData;
});
}
async run(): Promise<RuleTaskRunResult> {
this.runDate = new Date();
const {
params: { alertId: ruleId, spaceId },
startedAt,
@ -714,7 +684,7 @@ export class TaskRunner<
// fetch the rule again to ensure we return the correct schedule as it may have
// changed during the task execution
const data = await getDecryptedRule(this.context, ruleId, spaceId);
schedule = asOk(data.indirectParams.schedule);
schedule = asOk(data.rawRule.schedule);
} catch (err) {
stateWithMetrics = asErr(err);
schedule = asErr(err);

View file

@ -531,7 +531,6 @@ describe('Task Runner Cancel', () => {
claim_to_start_duration_ms: 0,
persist_alerts_duration_ms: 0,
prepare_rule_duration_ms: 0,
prepare_to_run_duration_ms: 0,
process_alerts_duration_ms: 0,
process_rule_duration_ms: 0,
rule_type_run_duration_ms: 0,

View file

@ -34,7 +34,6 @@ describe('TaskRunnerTimer', () => {
claim_to_start_duration_ms: 259200000,
persist_alerts_duration_ms: 0,
prepare_rule_duration_ms: 0,
prepare_to_run_duration_ms: 0,
process_alerts_duration_ms: 0,
process_rule_duration_ms: 0,
rule_type_run_duration_ms: 0,
@ -53,7 +52,6 @@ describe('TaskRunnerTimer', () => {
claim_to_start_duration_ms: 432000000,
persist_alerts_duration_ms: 0,
prepare_rule_duration_ms: 0,
prepare_to_run_duration_ms: 0,
process_alerts_duration_ms: 0,
process_rule_duration_ms: 0,
rule_type_run_duration_ms: 0,

View file

@ -10,7 +10,6 @@ import { Logger } from '@kbn/core/server';
export enum TaskRunnerTimerSpan {
StartTaskRun = 'claim_to_start_duration_ms',
TotalRunDuration = 'total_run_duration_ms',
PrepareToRun = 'prepare_to_run_duration_ms',
PrepareRule = 'prepare_rule_duration_ms',
RuleTypeRun = 'rule_type_run_duration_ms',
ProcessAlerts = 'process_alerts_duration_ms',
@ -61,7 +60,6 @@ export class TaskRunnerTimer {
[TaskRunnerTimerSpan.StartTaskRun]: this.timings[TaskRunnerTimerSpan.StartTaskRun] ?? 0,
[TaskRunnerTimerSpan.TotalRunDuration]:
this.timings[TaskRunnerTimerSpan.TotalRunDuration] ?? 0,
[TaskRunnerTimerSpan.PrepareToRun]: this.timings[TaskRunnerTimerSpan.PrepareToRun] ?? 0,
[TaskRunnerTimerSpan.PrepareRule]: this.timings[TaskRunnerTimerSpan.PrepareRule] ?? 0,
[TaskRunnerTimerSpan.RuleTypeRun]: this.timings[TaskRunnerTimerSpan.RuleTypeRun] ?? 0,
[TaskRunnerTimerSpan.ProcessAlerts]: this.timings[TaskRunnerTimerSpan.ProcessAlerts] ?? 0,

View file

@ -398,9 +398,6 @@
"prepare_rule_duration_ms": {
"type": "long"
},
"prepare_to_run_duration_ms": {
"type": "long"
},
"total_run_duration_ms": {
"type": "long"
},

View file

@ -175,7 +175,6 @@ export const EventSchema = schema.maybe(
claim_to_start_duration_ms: ecsStringOrNumber(),
persist_alerts_duration_ms: ecsStringOrNumber(),
prepare_rule_duration_ms: ecsStringOrNumber(),
prepare_to_run_duration_ms: ecsStringOrNumber(),
total_run_duration_ms: ecsStringOrNumber(),
total_enrichment_duration_ms: ecsStringOrNumber(),
})

View file

@ -173,9 +173,6 @@ exports.EcsCustomPropertyMappings = {
prepare_rule_duration_ms: {
type: 'long',
},
prepare_to_run_duration_ms: {
type: 'long',
},
total_run_duration_ms: {
type: 'long',
},

View file

@ -21,7 +21,6 @@ export type {
TaskRunCreatorFunction,
RunContext,
IntervalSchedule,
LoadIndirectParamsResult,
} from './task';
export { TaskStatus, TaskPriority } from './task';

View file

@ -95,29 +95,9 @@ export interface FailedTaskResult {
status: TaskStatus.Failed | TaskStatus.DeadLetter;
}
type IndirectParamsType = Record<string, unknown>;
export interface LoadedIndirectParams<
IndirectParams extends IndirectParamsType = IndirectParamsType
> {
[key: string]: unknown;
indirectParams: IndirectParams;
}
export type LoadIndirectParamsResult<T extends LoadedIndirectParams = LoadedIndirectParams> =
| {
data: T;
error?: never;
}
| {
data?: never;
error: Error;
};
export type LoadIndirectParamsFunction = () => Promise<LoadIndirectParamsResult>;
export type RunFunction = () => Promise<RunResult | undefined | void>;
export type CancelFunction = () => Promise<RunResult | undefined | void>;
export interface CancellableTask<T = never> {
loadIndirectParams?: LoadIndirectParamsFunction;
run: RunFunction;
cancel?: CancelFunction;
cleanup?: () => Promise<void>;
@ -184,8 +164,6 @@ export const taskDefinitionSchema = schema.object(
),
paramsSchema: schema.maybe(schema.any()),
// schema of the data fetched by the task runner (in loadIndirectParams) e.g. rule, action etc.
indirectParamsSchema: schema.maybe(schema.any()),
},
{
validate({ timeout, priority }) {
@ -220,7 +198,6 @@ export type TaskDefinition = Omit<TypeOf<typeof taskDefinitionSchema>, 'paramsSc
}
>;
paramsSchema?: ObjectType;
indirectParamsSchema?: ObjectType;
};
export enum TaskStatus {

View file

@ -82,7 +82,6 @@ export interface TaskRegisterDefinition {
>;
paramsSchema?: ObjectType;
indirectParamsSchema?: ObjectType;
}
/**