Add an API endpoint to bulk fill rule gaps (#220866)

## Summary
Original issue: https://github.com/elastic/security-team/issues/10688
This PR adds a new function to the rules client called
`bulkFillGapsByRuleIds`, that takes a list of rule ids and a date range,
and attempts to fill out the gaps simultaneously, 10 rules at a time
(can be customizable).

For all rules we verify if the user has access to write to the gaps
object, then for each rule we fetch gaps and trigger the backfilling of
its unfilled intervals.

During the execution, we aggregate the errors and the rules that are
skipped in lists and return them along with the scheduling outcomes.

### Error handling
There are 2 error types that are handled throughout the scheduling of a
gaps backfilling for a rule:
- A user doesn't have access the rule
- We fail to backfill gaps

### Skipped rules
A rule can be skipped when it doesn't have unfilled gaps in the given
time range, or when gaps are found, but they are currently "in
progress".

### Endpoint
This PR also updates the existing bulk actions endpoint by adding the
action `fill_gaps`. The endpoint for now should limit the amount of
rules per call to 100.

The bulk actions endpoint returns a summary of the execution which is a
list of counters of rules that succeeded, failed or were skipped.

## How to test?
1. Create several rules that run every 5 seconds and disable them. Leave
them disabled for a couple of minutes and then re-enable them in order
to create some gaps for it. You can confirm that there are gaps by
clicking on the rule, then on the "Execution results" tab, and then
scrolling down to "Gaps".
Alternatively you can use [this
tool](https://github.com/elastic/security-documents-generator) to create
1 rule (5m) with 1000 gaps:
```
yarn start rules --rules 1 -g 1000 -c -i"5m"
```
2. Get the ids of each rule that you created and call the endpoint to
backfill them. You can select a time range that is smaller than the gaps
so that you can call it several times. Here is an example of a curl to
do this
```
curl -X POST "http://localhost:5601/api/detection_engine/rules/_bulk_action?dry_run=false" \
  -H "Accept: */*" \
  -H "Content-Type: application/json" \
  -H "kbn-version: 9.1.0" \
  -H "kbn-build-number: 9007199254740991" \
  -H "elastic-api-version: 2023-10-31" \
  -H "Authorization: Basic $(echo -n 'elastic:changeme' | base64)" \
  --data-raw '{
    "action": "fill_gaps",
    "ids": ["307bdea8-28be-419f-bb25-dac0024f32af"],
    "fill_gaps": {
      "start_date": "2025-05-09T08:12:09.457Z",
      "end_date": "2025-05-09T09:12:09.457Z"
    }
  }'
```
3. Then you can go to the "Gaps" tab from step 1 and verify that the
gaps are being filled. Additionally, you can see that there are manual
runs scheduled for the rule in order to fill the gaps.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Edgar Santos 2025-06-18 23:21:15 +02:00 committed by GitHub
parent 575e80bccc
commit 9566f1a7b3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
45 changed files with 2876 additions and 255 deletions

View file

@ -8394,6 +8394,17 @@ paths:
timestamp: '2023-10-31T00:00:00.000Z'
ids:
- 9e946bfc-3118-4c77-bb25-67d781191921
example27:
description: The following request triggers the filling of gaps for the specified rule ids and time range
summary: Fill Gaps - Manually trigger the filling of gaps for specified rules
value:
action: fill_gaps
ids:
- 748694f0-6977-4ea5-8384-cd2e39730779
- 164d0918-f720-4c9f-9f5c-c5122587cf19
run:
end_date: '2025-03-10T23:59:59.999Z'
start_date: '2025-03-01T00:00:00.000Z'
schema:
oneOf:
- $ref: '#/components/schemas/Security_Detections_API_BulkDeleteRules'
@ -8402,6 +8413,7 @@ paths:
- $ref: '#/components/schemas/Security_Detections_API_BulkExportRules'
- $ref: '#/components/schemas/Security_Detections_API_BulkDuplicateRules'
- $ref: '#/components/schemas/Security_Detections_API_BulkManualRuleRun'
- $ref: '#/components/schemas/Security_Detections_API_BulkManualRuleFillGaps'
- $ref: '#/components/schemas/Security_Detections_API_BulkEditRules'
responses:
'200':
@ -59042,6 +59054,7 @@ components:
- ESQL_INDEX_PATTERN
- MANUAL_RULE_RUN_FEATURE
- MANUAL_RULE_RUN_DISABLED_RULE
- RULE_FILL_GAPS_DISABLED_RULE
type: string
Security_Detections_API_BulkActionSkipResult:
type: object
@ -59051,7 +59064,9 @@ components:
name:
type: string
skip_reason:
$ref: '#/components/schemas/Security_Detections_API_BulkEditSkipReason'
oneOf:
- $ref: '#/components/schemas/Security_Detections_API_BulkEditSkipReason'
- $ref: '#/components/schemas/Security_Detections_API_BulkGapsFillingSkipReason'
required:
- id
- skip_reason
@ -59293,6 +59308,48 @@ components:
type: string
required:
- action
Security_Detections_API_BulkGapsFillingSkipReason:
enum:
- NO_GAPS_TO_FILL
type: string
Security_Detections_API_BulkManualRuleFillGaps:
type: object
properties:
action:
enum:
- fill_gaps
type: string
fill_gaps:
description: Object that describes applying a manual gap fill action for the specified time range.
type: object
properties:
end_date:
description: End date of the manual gap fill
type: string
start_date:
description: Start date of the manual gap fill
type: string
required:
- start_date
- end_date
gaps_range_end:
description: Gaps range end, valid only when query is provided
type: string
gaps_range_start:
description: Gaps range start, valid only when query is provided
type: string
ids:
description: Array of rule IDs. Array of rule IDs to which a bulk action will be applied. Only valid when query property is undefined.
items:
type: string
minItems: 1
type: array
query:
description: Query to filter rules.
type: string
required:
- action
- fill_gaps
Security_Detections_API_BulkManualRuleRun:
type: object
properties:

View file

@ -10069,6 +10069,17 @@ paths:
timestamp: '2023-10-31T00:00:00.000Z'
ids:
- 9e946bfc-3118-4c77-bb25-67d781191921
example27:
description: The following request triggers the filling of gaps for the specified rule ids and time range
summary: Fill Gaps - Manually trigger the filling of gaps for specified rules
value:
action: fill_gaps
ids:
- 748694f0-6977-4ea5-8384-cd2e39730779
- 164d0918-f720-4c9f-9f5c-c5122587cf19
run:
end_date: '2025-03-10T23:59:59.999Z'
start_date: '2025-03-01T00:00:00.000Z'
schema:
oneOf:
- $ref: '#/components/schemas/Security_Detections_API_BulkDeleteRules'
@ -10077,6 +10088,7 @@ paths:
- $ref: '#/components/schemas/Security_Detections_API_BulkExportRules'
- $ref: '#/components/schemas/Security_Detections_API_BulkDuplicateRules'
- $ref: '#/components/schemas/Security_Detections_API_BulkManualRuleRun'
- $ref: '#/components/schemas/Security_Detections_API_BulkManualRuleFillGaps'
- $ref: '#/components/schemas/Security_Detections_API_BulkEditRules'
responses:
'200':
@ -68414,6 +68426,7 @@ components:
- ESQL_INDEX_PATTERN
- MANUAL_RULE_RUN_FEATURE
- MANUAL_RULE_RUN_DISABLED_RULE
- RULE_FILL_GAPS_DISABLED_RULE
type: string
Security_Detections_API_BulkActionSkipResult:
type: object
@ -68423,7 +68436,9 @@ components:
name:
type: string
skip_reason:
$ref: '#/components/schemas/Security_Detections_API_BulkEditSkipReason'
oneOf:
- $ref: '#/components/schemas/Security_Detections_API_BulkEditSkipReason'
- $ref: '#/components/schemas/Security_Detections_API_BulkGapsFillingSkipReason'
required:
- id
- skip_reason
@ -68665,6 +68680,48 @@ components:
type: string
required:
- action
Security_Detections_API_BulkGapsFillingSkipReason:
enum:
- NO_GAPS_TO_FILL
type: string
Security_Detections_API_BulkManualRuleFillGaps:
type: object
properties:
action:
enum:
- fill_gaps
type: string
fill_gaps:
description: Object that describes applying a manual gap fill action for the specified time range.
type: object
properties:
end_date:
description: End date of the manual gap fill
type: string
start_date:
description: Start date of the manual gap fill
type: string
required:
- start_date
- end_date
gaps_range_end:
description: Gaps range end, valid only when query is provided
type: string
gaps_range_start:
description: Gaps range start, valid only when query is provided
type: string
ids:
description: Array of rule IDs. Array of rule IDs to which a bulk action will be applied. Only valid when query property is undefined.
items:
type: string
minItems: 1
type: array
query:
description: Query to filter rules.
type: string
required:
- action
- fill_gaps
Security_Detections_API_BulkManualRuleRun:
type: object
properties:

View file

@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Rule } from './rule';
export type BulkEditSkipReason = 'RULE_NOT_MODIFIED';
export type BulkGapsFillingSkipReason = 'NO_GAPS_TO_FILL';
type AllowedSkipReason = BulkEditSkipReason | BulkGapsFillingSkipReason;
interface SkipResult<SkipReason extends AllowedSkipReason> {
id: Rule['id'];
name?: Rule['name'];
skip_reason: SkipReason;
}
export type BulkEditActionSkipResult = SkipResult<BulkEditSkipReason>;
export type BulkGapsFillingSkipResult = SkipResult<BulkGapsFillingSkipReason>;
export type BulkActionSkipResult = BulkEditActionSkipResult | BulkGapsFillingSkipResult;

View file

@ -120,7 +120,13 @@ export type {
DefaultActionGroupId,
} from './builtin_action_groups';
export { getBuiltinActionGroups, RecoveredActionGroup } from './builtin_action_groups';
export type { BulkEditSkipReason, BulkActionSkipResult } from './bulk_edit';
export type {
BulkEditSkipReason,
BulkGapsFillingSkipReason,
BulkEditActionSkipResult,
BulkGapsFillingSkipResult,
BulkActionSkipResult,
} from './bulk_action';
export {
DisabledActionTypeIdsForActionGroup,
isActionGroupDisabledForActionTypeId,

View file

@ -10,6 +10,7 @@ import Boom from '@hapi/boom';
import type { KueryNode } from '@kbn/es-query';
import { nodeBuilder } from '@kbn/es-query';
import type { SavedObjectsFindResult } from '@kbn/core/server';
import type { Gap } from '../../../../lib/rule_gaps/gap';
import { RULE_SAVED_OBJECT_TYPE } from '../../../../saved_objects';
import { findRulesSo } from '../../../../data/rule';
import {
@ -31,7 +32,8 @@ import type { RawRule } from '../../../../types';
export async function scheduleBackfill(
context: RulesClientContext,
params: ScheduleBackfillParams
params: ScheduleBackfillParams,
gaps?: Gap[]
): Promise<ScheduleBackfillResults> {
try {
scheduleBackfillParamsSchema.validate(params);
@ -163,6 +165,7 @@ export async function scheduleBackfill(
(connectorId: string) => actionsClient.isSystemAction(connectorId)
);
}),
gaps,
ruleTypeRegistry: context.ruleTypeRegistry,
spaceId: context.spaceId,
unsecuredSavedObjectsClient: context.unsecuredSavedObjectsClient,

View file

@ -19,7 +19,7 @@ import type {
import { validateAndAuthorizeSystemActions } from '../../../../lib/validate_authorize_system_actions';
import type { Rule, RuleAction, RuleSystemAction } from '../../../../../common';
import { RULE_SAVED_OBJECT_TYPE } from '../../../../saved_objects';
import type { BulkActionSkipResult } from '../../../../../common/bulk_edit';
import type { BulkEditActionSkipResult } from '../../../../../common/bulk_action';
import type { RuleTypeRegistry } from '../../../../types';
import {
validateRuleTypeParams,
@ -107,7 +107,7 @@ type RuleType = ReturnType<RuleTypeRegistry['get']>;
// TODO (http-versioning): This should be of type Rule, change this when all rule types are fixed
export interface BulkEditResult<Params extends RuleParams> {
rules: Array<SanitizedRule<Params>>;
skipped: BulkActionSkipResult[];
skipped: BulkEditActionSkipResult[];
errors: BulkOperationError[];
total: number;
}
@ -284,7 +284,7 @@ async function bulkEditRulesOcc<Params extends RuleParams>(
rules: Array<SavedObjectsBulkUpdateObject<RawRule>>;
resultSavedObjects: Array<SavedObjectsUpdateResponse<RawRule>>;
errors: BulkOperationError[];
skipped: BulkActionSkipResult[];
skipped: BulkEditActionSkipResult[];
}> {
const rulesFinder =
await context.encryptedSavedObjectsClient.createPointInTimeFinderDecryptedAsInternalUser<RawRule>(
@ -297,7 +297,7 @@ async function bulkEditRulesOcc<Params extends RuleParams>(
);
const rules: Array<SavedObjectsBulkUpdateObject<RawRule>> = [];
const skipped: BulkActionSkipResult[] = [];
const skipped: BulkEditActionSkipResult[] = [];
const errors: BulkOperationError[] = [];
const apiKeysMap: ApiKeysMap = new Map();
const username = await context.getUserName();
@ -444,7 +444,7 @@ async function updateRuleAttributesAndParamsInMemory<Params extends RuleParams>(
paramsModifier?: ParamsModifier<Params>;
apiKeysMap: ApiKeysMap;
rules: Array<SavedObjectsBulkUpdateObject<RawRule>>;
skipped: BulkActionSkipResult[];
skipped: BulkEditActionSkipResult[];
errors: BulkOperationError[];
username: string | null;
shouldIncrementRevision?: ShouldIncrementRevision<Params>;

View file

@ -0,0 +1,165 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Gap } from '../../../../lib/rule_gaps/gap';
import { processAllGapsInTimeRange } from '../../../../lib/rule_gaps/process_all_gaps_in_time_range';
import { batchBackfillRuleGaps } from './batch_backfill_rule_gaps';
import { rulesClientContextMock } from '../../../../rules_client/rules_client.mock';
import type { BulkGapFillError } from './utils';
import { BulkFillGapsScheduleResult, BulkGapsFillStep } from './types';
import { processGapsBatch } from './process_gaps_batch';
jest.mock('./process_gaps_batch', () => {
return {
processGapsBatch: jest.fn(),
};
});
const processGapsBatchMock = processGapsBatch as jest.Mock;
jest.mock('../../../../lib/rule_gaps/process_all_gaps_in_time_range', () => {
return {
processAllGapsInTimeRange: jest.fn(),
};
});
const processAllGapsInTimeRangeMock = processAllGapsInTimeRange as jest.Mock;
jest.mock('../../../backfill/methods/schedule', () => {
return {
scheduleBackfill: jest.fn(),
};
});
describe('batchBackfillRuleGaps', () => {
const context = rulesClientContextMock.create();
const currentLogger = context.logger;
(context.logger.get as jest.Mock).mockImplementation(() => currentLogger);
const rule = { id: 'some-rule-id', name: 'some-rule-name' };
const backfillingDateRange = {
start: '2025-05-09T09:15:09.457Z',
end: '2025-05-20T09:24:09.457Z',
};
const range = (start: string, end: string) => ({ gte: new Date(start), lte: new Date(end) });
const createGap = (unfilledIntervals: Array<ReturnType<typeof range>>): Gap => {
return {
unfilledIntervals,
} as unknown as Gap;
};
const gapsBatches = [
[
createGap([range('2025-05-10T09:15:09.457Z', '2025-05-11T09:15:09.457Z')]),
createGap([range('2025-05-12T09:15:09.457Z', '2025-05-13T09:15:09.457Z')]),
],
[createGap([range('2025-05-13T09:15:09.457Z', '2025-05-14T09:15:09.457Z')])],
];
let result: Awaited<ReturnType<typeof batchBackfillRuleGaps>>;
beforeEach(() => {
jest.resetAllMocks();
});
const callBatchBackfillRuleGaps = async () => {
result = await batchBackfillRuleGaps(context, {
rule,
range: backfillingDateRange,
maxGapCountPerRule: 1000,
});
};
beforeEach(() => {
processGapsBatchMock.mockResolvedValue(true);
processAllGapsInTimeRangeMock.mockImplementation(async ({ processGapsBatch: processFn }) => {
const results: Awaited<ReturnType<typeof processFn>> = [];
for (const batch of gapsBatches) {
results.push(await processFn(batch));
}
return results;
});
});
describe('when there are gaps to backfill', () => {
beforeEach(async () => {
await callBatchBackfillRuleGaps();
});
it('should trigger the batch fetching of the gaps of the rule correctly', () => {
expect(processAllGapsInTimeRangeMock).toHaveBeenCalledTimes(1);
expect(processAllGapsInTimeRangeMock).toHaveBeenCalledWith({
logger: context.logger.get('gaps'),
options: {
maxFetchedGaps: 1000,
},
processGapsBatch: expect.any(Function),
ruleId: rule.id,
...backfillingDateRange,
});
});
it('should call processGapsBatch for each fetched gap batch', () => {
expect(processGapsBatchMock).toHaveBeenCalledTimes(gapsBatches.length);
gapsBatches.forEach((batch, idx) => {
const callOrder = idx + 1;
expect(processGapsBatchMock).toHaveBeenNthCalledWith(callOrder, context, {
rule,
range: backfillingDateRange,
gapsBatch: batch,
});
});
});
it('should return a successful outcome', () => {
expect(result.outcome).toEqual(BulkFillGapsScheduleResult.BACKFILLED);
});
});
describe('when there are errors processing gaps', () => {
const getErrorFromResult = () => {
return (result as { outcome: string; error: BulkGapFillError }).error;
};
it('should propagate the error when processAllGapsInTimeRange errors', async () => {
processAllGapsInTimeRangeMock.mockRejectedValueOnce(
new Error('processAllGapsInTimeRange failed')
);
await callBatchBackfillRuleGaps();
expect(result.outcome).toEqual(BulkFillGapsScheduleResult.ERRORED);
expect(getErrorFromResult()).toEqual({
errorMessage: 'processAllGapsInTimeRange failed',
rule,
step: BulkGapsFillStep.SCHEDULING,
});
});
it('should propagate the error when the processGapsBatch returns an error', async () => {
const errorMessage = 'error when calling processGapsBatch';
processGapsBatchMock.mockRejectedValueOnce(new Error(errorMessage));
await callBatchBackfillRuleGaps();
expect(result.outcome).toEqual(BulkFillGapsScheduleResult.ERRORED);
expect(getErrorFromResult()).toEqual({
errorMessage,
rule,
step: BulkGapsFillStep.SCHEDULING,
});
});
});
describe('when there is nothing to backfill', () => {
beforeEach(async () => {
processGapsBatchMock.mockResolvedValue(false);
await callBatchBackfillRuleGaps();
});
it('should return a result indicating that backfilling was skipped for this rule', () => {
expect(result.outcome).toEqual(BulkFillGapsScheduleResult.SKIPPED);
});
});
});

View file

@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { BulkFillGapsByRuleIdsParams } from './types';
import { BulkFillGapsScheduleResult, BulkGapsFillStep } from './types';
import { processAllGapsInTimeRange } from '../../../../lib/rule_gaps/process_all_gaps_in_time_range';
import type { Gap } from '../../../../lib/rule_gaps/gap';
import type { BulkGapFillError } from './utils';
import { logProcessedAsAuditEvent, toBulkGapFillError } from './utils';
import type { RulesClientContext } from '../../../../rules_client';
import { processGapsBatch } from './process_gaps_batch';
interface BatchBackfillRuleGapsParams {
rule: { id: string; name: string };
range: BulkFillGapsByRuleIdsParams['range'];
maxGapCountPerRule: number;
}
type BatchBackfillScheduleRuleGapsResult =
| { outcome: BulkFillGapsScheduleResult.BACKFILLED }
| { outcome: BulkFillGapsScheduleResult.SKIPPED }
| { outcome: BulkFillGapsScheduleResult.ERRORED; error: BulkGapFillError };
export const batchBackfillRuleGaps = async (
context: RulesClientContext,
{ rule, range, maxGapCountPerRule }: BatchBackfillRuleGapsParams
): Promise<BatchBackfillScheduleRuleGapsResult> => {
const logger = context.logger.get('gaps');
const { start, end } = range;
let resultError: BulkGapFillError | undefined;
let hasBeenBackfilled = false;
const eventLogClient = await context.getEventLogClient();
try {
const processingResults = await processAllGapsInTimeRange({
ruleId: rule.id,
start,
end,
eventLogClient,
logger,
processGapsBatch: (gapsBatch: Gap[]) => processGapsBatch(context, { rule, range, gapsBatch }),
options: { maxFetchedGaps: maxGapCountPerRule },
});
hasBeenBackfilled = processingResults.some((result) => result);
} catch (error) {
logProcessedAsAuditEvent(context, rule, error);
resultError = toBulkGapFillError(rule, BulkGapsFillStep.SCHEDULING, error);
}
if (!resultError && !hasBeenBackfilled) {
return {
outcome: BulkFillGapsScheduleResult.SKIPPED,
};
}
if (resultError) {
return {
outcome: BulkFillGapsScheduleResult.ERRORED,
error: resultError,
};
}
return {
outcome: BulkFillGapsScheduleResult.BACKFILLED,
};
};

View file

@ -0,0 +1,234 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { rulesClientContextMock } from '../../../../rules_client/rules_client.mock';
import { bulkFillGapsByRuleIds } from './bulk_fill_gaps_by_rule_ids';
import {
BulkGapsFillStep,
type BulkFillGapsByRuleIdsParams,
type BulkFillGapsByRuleIdsResult,
BulkFillGapsScheduleResult,
} from './types';
import type { RulesClientContext } from '../../../../rules_client';
import { batchBackfillRuleGaps } from './batch_backfill_rule_gaps';
import { RuleAuditAction, ruleAuditEvent } from '../../../../rules_client/common/audit_events';
import { RULE_SAVED_OBJECT_TYPE } from '../get_rule_ids_with_gaps/get_rule_ids_with_gaps';
import { toBulkGapFillError } from './utils';
import { AlertingAuthorizationEntity, WriteOperations } from '../../../../authorization';
jest.mock('./batch_backfill_rule_gaps', () => {
return {
batchBackfillRuleGaps: jest.fn(),
};
});
const batchBackfillRuleGapsMock = batchBackfillRuleGaps as jest.Mock;
jest.mock('../../../../rules_client/common/audit_events', () => {
const actual = jest.requireActual('../../../../rules_client/common/audit_events');
return {
...actual,
ruleAuditEvent: jest.fn(),
};
});
const ruleAuditEventMock = ruleAuditEvent as jest.Mock;
const buildRule = (id: number): BulkFillGapsByRuleIdsParams['rules'][0] => {
return {
id: id.toString(),
name: `${id}-rule-name`,
consumer: `rule-consumer-${id % 2}`,
alertTypeId: `rule-alert-type-id-${id % 2}`,
};
};
const backfillRange = { start: '2025-05-09T09:15:09.457Z', end: '2025-05-09T09:24:09.457Z' };
const successfulRules = Array.from({ length: 3 }, (_, idx) =>
buildRule(idx)
) as BulkFillGapsByRuleIdsParams['rules'];
const erroredRuleAtAuthorization = buildRule(3);
erroredRuleAtAuthorization.alertTypeId = 'should-break';
erroredRuleAtAuthorization.consumer = 'should-break';
const skippedRule = buildRule(4);
const errroredRuleAtBackfilling = buildRule(5);
const allRules = [
...successfulRules,
erroredRuleAtAuthorization,
skippedRule,
errroredRuleAtBackfilling,
];
const rulesThatAttemptedToBackfill = [...successfulRules, skippedRule, errroredRuleAtBackfilling];
let rulesClientContext: RulesClientContext;
describe('bulkFillGapsByRuleIds', () => {
let refreshIndexMock: jest.Mock;
let results: BulkFillGapsByRuleIdsResult;
let ensuredAuthorizedMock: jest.Mock;
const authorizationError = new Error('error at authorization');
const schedulingError = new Error('error at scheduling');
beforeEach(async () => {
rulesClientContext = rulesClientContextMock.create();
const eventLogClientMock = rulesClientContext.getEventLogClient as jest.Mock;
refreshIndexMock = jest.fn();
eventLogClientMock.mockResolvedValue({ refreshIndex: refreshIndexMock });
ensuredAuthorizedMock = rulesClientContext.authorization.ensureAuthorized as jest.Mock;
ensuredAuthorizedMock.mockImplementation(async ({ ruleTypeId }) => {
if (ruleTypeId === erroredRuleAtAuthorization.alertTypeId) {
throw authorizationError;
}
});
batchBackfillRuleGapsMock.mockImplementation(async (_, { rule: { id: ruleId } }) => {
if (ruleId === skippedRule.id) {
return {
outcome: BulkFillGapsScheduleResult.SKIPPED,
};
}
if (ruleId === errroredRuleAtBackfilling.id) {
return {
outcome: BulkFillGapsScheduleResult.ERRORED,
error: toBulkGapFillError(
errroredRuleAtBackfilling,
BulkGapsFillStep.SCHEDULING,
schedulingError
),
};
}
return {
outcome: BulkFillGapsScheduleResult.BACKFILLED,
};
});
ruleAuditEventMock.mockImplementation((payload) => payload);
results = await bulkFillGapsByRuleIds(
rulesClientContext,
{
rules: allRules,
range: backfillRange,
},
{
maxGapCountPerRule: 1000,
}
);
});
afterEach(() => {
jest.resetAllMocks();
});
it('should ensure the user is authorized for each combination of ruleTypeId and consumer found in the rules', () => {
expect(ensuredAuthorizedMock).toHaveBeenCalledTimes(3);
expect(ensuredAuthorizedMock).toHaveBeenNthCalledWith(1, {
consumer: 'rule-consumer-0',
entity: AlertingAuthorizationEntity.Rule,
operation: WriteOperations.FillGaps,
ruleTypeId: 'rule-alert-type-id-0',
});
expect(ensuredAuthorizedMock).toHaveBeenNthCalledWith(2, {
consumer: 'rule-consumer-1',
entity: AlertingAuthorizationEntity.Rule,
operation: WriteOperations.FillGaps,
ruleTypeId: 'rule-alert-type-id-1',
});
expect(ensuredAuthorizedMock).toHaveBeenNthCalledWith(3, {
consumer: 'should-break',
entity: AlertingAuthorizationEntity.Rule,
operation: WriteOperations.FillGaps,
ruleTypeId: 'should-break',
});
});
it('should call batchBackfillRuleGaps correctly', () => {
rulesThatAttemptedToBackfill.forEach((rule) => {
expect(batchBackfillRuleGapsMock).toHaveBeenCalledWith(rulesClientContext, {
rule,
range: backfillRange,
maxGapCountPerRule: 1000,
});
});
});
it('should return a list with rules that succeeded', () => {
expect(results.backfilled).toHaveLength(successfulRules.length);
// the order in which they are processed is not guaranteed because we use pMap
successfulRules.forEach((successfulRule) => {
expect(results.backfilled).toContain(successfulRule);
});
});
it('should return a list with errored rules', () => {
expect(results.errored).toEqual([
toBulkGapFillError(
erroredRuleAtAuthorization,
BulkGapsFillStep.ACCESS_VALIDATION,
authorizationError
),
toBulkGapFillError(errroredRuleAtBackfilling, BulkGapsFillStep.SCHEDULING, schedulingError),
]);
});
it('should return a list with rules that were skipped', () => {
expect(results.skipped).toEqual([skippedRule]);
});
it('should log an audit event when the rule fails at the authorization step', () => {
const { id, name } = erroredRuleAtAuthorization;
expect(rulesClientContext.auditLogger?.log).toHaveBeenCalledWith({
action: RuleAuditAction.FILL_GAPS,
savedObject: { type: RULE_SAVED_OBJECT_TYPE, id, name },
error: authorizationError,
});
});
it('should refresh the index of the event client once after scheduling the backfilling of all gaps', () => {
expect(refreshIndexMock).toHaveBeenCalledTimes(1);
});
});
describe('validation', () => {
beforeEach(() => {
rulesClientContext = rulesClientContextMock.create();
});
const getCallBulkFillGaps = (range: { start: string; end: string }) => () =>
bulkFillGapsByRuleIds(rulesClientContext, { rules: [], range }, { maxGapCountPerRule: 1000 });
it('should throw an error if the start date is in the future', async () => {
const start = new Date(Date.now() + 1).toISOString();
await expect(getCallBulkFillGaps({ start, end: new Date().toISOString() })).rejects.toThrow();
});
it('should throw an error if the end date is in the future', async () => {
const end = new Date(Date.now() + 1).toISOString();
await expect(
getCallBulkFillGaps({ start: new Date(Date.now() - 1).toISOString(), end })
).rejects.toThrow();
});
it('should throw an error if the end date is not strictly greater than the start date', async () => {
const now = new Date();
await expect(
getCallBulkFillGaps({
start: now.toISOString(),
end: now.toISOString(),
})
).rejects.toThrow();
});
it('should throw an error if there are more than 90 days between the start and end dates', async () => {
await expect(
getCallBulkFillGaps({
start: new Date(Date.now() - 91 * 24 * 60 * 60 * 1000).toISOString(),
end: new Date().toISOString(),
})
).rejects.toThrow();
});
});

View file

@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import pMap from 'p-map';
import Boom from '@hapi/boom';
import { groupBy, mapValues } from 'lodash';
import type { RulesClientContext } from '../../../../rules_client';
import {
type BulkFillGapsByRuleIdsResult,
type BulkFillGapsByRuleIdsParams,
type BulkFillGapsByRuleIdsOptions,
BulkGapsFillStep,
BulkFillGapsScheduleResult,
} from './types';
import { logProcessedAsAuditEvent, toBulkGapFillError } from './utils';
import { AlertingAuthorizationEntity, WriteOperations } from '../../../../authorization';
import { batchBackfillRuleGaps } from './batch_backfill_rule_gaps';
import { validateBackfillSchedule } from '../../../../../common/lib';
const DEFAULT_MAX_BACKFILL_CONCURRENCY = 10;
export const bulkFillGapsByRuleIds = async (
context: RulesClientContext,
{ rules, range }: BulkFillGapsByRuleIdsParams,
options: BulkFillGapsByRuleIdsOptions
): Promise<BulkFillGapsByRuleIdsResult> => {
const errorString = validateBackfillSchedule(range.start, range.end);
if (errorString) {
throw Boom.badRequest(
`Error validating backfill schedule parameters "${JSON.stringify(range)}" - ${errorString}`
);
}
const errored: BulkFillGapsByRuleIdsResult['errored'] = [];
const skipped: BulkFillGapsByRuleIdsResult['skipped'] = [];
const backfilled: BulkFillGapsByRuleIdsParams['rules'] = [];
const eventLogClient = await context.getEventLogClient();
const maxBackfillConcurrency = Math.max(
options?.maxBackfillConcurrency ?? DEFAULT_MAX_BACKFILL_CONCURRENCY,
DEFAULT_MAX_BACKFILL_CONCURRENCY
);
// Make sure user has access to these rules
const rulesByRuleType = mapValues(
groupBy(rules, (rule) => `${rule.alertTypeId}<>${rule.consumer}`),
(groupedRules) => ({
ruleTypeId: groupedRules[0].alertTypeId,
consumer: groupedRules[0].consumer,
rules: groupedRules,
})
);
const authorizedRules: typeof rules = [];
for (const { ruleTypeId, consumer, rules: rulesBatch } of Object.values(rulesByRuleType)) {
try {
await context.authorization.ensureAuthorized({
ruleTypeId,
consumer,
operation: WriteOperations.FillGaps,
entity: AlertingAuthorizationEntity.Rule,
});
authorizedRules.push(...rulesBatch);
} catch (error) {
rulesBatch.forEach((rule) => {
logProcessedAsAuditEvent(context, { id: rule.id, name: rule.name }, error);
errored.push(toBulkGapFillError(rule, BulkGapsFillStep.ACCESS_VALIDATION, error));
return;
});
}
}
await pMap(
authorizedRules,
async (rule) => {
const backfillResult = await batchBackfillRuleGaps(context, {
rule,
range,
maxGapCountPerRule: options.maxGapCountPerRule,
});
switch (backfillResult.outcome) {
case BulkFillGapsScheduleResult.BACKFILLED:
backfilled.push(rule);
break;
case BulkFillGapsScheduleResult.ERRORED:
errored.push(backfillResult.error);
break;
case BulkFillGapsScheduleResult.SKIPPED:
skipped.push(rule);
break;
}
},
{
concurrency: maxBackfillConcurrency,
}
);
await eventLogClient.refreshIndex();
return { backfilled, skipped, errored };
};

View file

@ -5,12 +5,4 @@
* 2.0.
*/
import type { Rule } from './rule';
export type BulkEditSkipReason = 'RULE_NOT_MODIFIED';
export interface BulkActionSkipResult {
id: Rule['id'];
name?: Rule['name'];
skip_reason: BulkEditSkipReason;
}
export { bulkFillGapsByRuleIds } from './bulk_fill_gaps_by_rule_ids';

View file

@ -0,0 +1,168 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Gap } from '../../../../lib/rule_gaps/gap';
import { scheduleBackfill } from '../../../backfill/methods/schedule';
import { rulesClientContextMock } from '../../../../rules_client/rules_client.mock';
import { processGapsBatch } from './process_gaps_batch';
jest.mock('../../../backfill/methods/schedule', () => {
return {
scheduleBackfill: jest.fn(),
};
});
const scheduleBackfillMock = scheduleBackfill as jest.Mock;
describe('processGapsBatch', () => {
const context = rulesClientContextMock.create();
const rule = { id: 'some-rule-id', name: 'some-rule-name' };
const backfillingDateRange = {
start: '2025-05-09T09:15:09.457Z',
end: '2025-05-20T09:24:09.457Z',
};
const range = (start: string, end: string) => ({ gte: new Date(start), lte: new Date(end) });
const createGap = (unfilledIntervals: Array<ReturnType<typeof range>>): Gap => {
return {
unfilledIntervals,
} as unknown as Gap;
};
const testBatch = [
createGap([range('2025-05-10T09:15:09.457Z', '2025-05-11T09:15:09.457Z')]),
createGap([range('2025-05-12T09:15:09.457Z', '2025-05-13T09:15:09.457Z')]),
];
const getGapScheduleRange = (gap: Gap) => {
return gap.unfilledIntervals.map(({ gte, lte }) => ({
start: gte.toISOString(),
end: lte.toISOString(),
}));
};
let result: Awaited<ReturnType<typeof processGapsBatch>>;
beforeEach(() => {
jest.resetAllMocks();
});
const callProcessGapsBatch = async (batch = testBatch, dateRange = backfillingDateRange) => {
result = await processGapsBatch(context, {
rule,
range: dateRange,
gapsBatch: batch,
});
};
describe('when there are gaps to backfill', () => {
beforeEach(async () => {
scheduleBackfillMock.mockResolvedValue([{ some: 'successful result' }]);
await callProcessGapsBatch();
});
it('should trigger the backfilling of each fetched gap batch', () => {
expect(scheduleBackfillMock).toHaveBeenCalledTimes(1);
expect(scheduleBackfillMock).toHaveBeenCalledWith(
context,
[
{
ruleId: rule.id,
ranges: testBatch.flatMap(getGapScheduleRange),
},
],
testBatch
);
});
it('should return true if it schedules a backfill', () => {
expect(result).toBe(true);
});
});
describe('when there are errors backfilling gaps', () => {
it('should propagate the error when the scheduleBackfill returns an unexpected amount of results', async () => {
scheduleBackfillMock.mockResolvedValueOnce([
{ some: 'result' },
{ some: 'other unexpected result' },
]);
await expect(callProcessGapsBatch()).rejects.toEqual(
new Error('Unexpected scheduling result count 2')
);
});
it('should propagate the error when the scheduleBackfill returns an error', async () => {
const errorMessage = 'something went wrong when backfilling';
scheduleBackfillMock.mockResolvedValueOnce([
{
error: {
message: errorMessage,
},
},
]);
await expect(callProcessGapsBatch()).rejects.toEqual(new Error(errorMessage));
});
});
describe('when called with an empty list of gaps', () => {
beforeEach(async () => {
await callProcessGapsBatch([]);
});
it('should not attempt to schedule backfills', () => {
expect(scheduleBackfillMock).not.toHaveBeenCalled();
});
it('should return false', () => {
expect(result).toBe(false);
});
});
describe('when the returned gaps are outside of the date range', () => {
const backfillingRange = {
start: '2025-05-10T09:15:09.457Z',
end: '2025-05-12T09:15:09.457Z',
};
const gapsBatchOutsideOfRange = [
createGap([range('2025-05-09T09:15:09.457Z', '2025-05-11T09:15:09.457Z')]),
createGap([range('2025-05-11T09:15:09.457Z', '2025-05-17T09:15:09.457Z')]),
];
const clampedGapsBatch = [
createGap([range(backfillingRange.start, '2025-05-11T09:15:09.457Z')]),
createGap([range('2025-05-11T09:15:09.457Z', backfillingRange.end)]),
];
beforeEach(async () => {
scheduleBackfillMock.mockResolvedValue([{ some: 'successful result' }]);
await callProcessGapsBatch(gapsBatchOutsideOfRange, backfillingRange);
});
it('should only schedule for gaps within the given time range', () => {
expect(scheduleBackfillMock).toHaveBeenCalledTimes(1);
expect(scheduleBackfillMock).toHaveBeenCalledWith(
context,
[
{
ruleId: rule.id,
ranges: clampedGapsBatch.flatMap(getGapScheduleRange),
},
],
gapsBatchOutsideOfRange
);
});
});
describe('when the unfilled intervals list is empty in all gaps', () => {
const gapsWithEmptyUnfilledIntervals = [createGap([]), createGap([])];
beforeEach(async () => {
await callProcessGapsBatch(gapsWithEmptyUnfilledIntervals);
});
it('should only schedule for gaps within the range', () => {
expect(scheduleBackfillMock).not.toHaveBeenCalled();
});
});
});

View file

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { clampIntervals } from '../../../../lib/rule_gaps/gap/interval_utils';
import type { BulkFillGapsByRuleIdsParams } from './types';
import type { Gap } from '../../../../lib/rule_gaps/gap';
import { scheduleBackfill } from '../../../backfill/methods/schedule';
import { logProcessedAsAuditEvent } from './utils';
import type { RulesClientContext } from '../../../../rules_client';
interface ProcessGapsBatchParams {
rule: { id: string; name: string };
range: BulkFillGapsByRuleIdsParams['range'];
gapsBatch: Gap[];
}
export const processGapsBatch = async (
context: RulesClientContext,
{ rule, range, gapsBatch }: ProcessGapsBatchParams
): Promise<boolean> => {
const { start, end } = range;
const gapRanges = gapsBatch.flatMap((gap) => {
const clampedIntervals = clampIntervals(gap.unfilledIntervals, {
gte: new Date(start),
lte: new Date(end),
});
return clampedIntervals.map(({ gte, lte }) => {
return {
start: gte.toISOString(),
end: lte.toISOString(),
};
});
});
// Rules might have gaps within the range that don't yield any schedulingPayload
// This can happen when they have gaps that are in an "in progress" state.
// They are returned still returned by the function that is querying gaps
if (gapRanges.length === 0) {
return false;
}
const schedulingPayload = {
ruleId: rule.id,
ranges: gapRanges,
};
const results = await scheduleBackfill(context, [schedulingPayload], gapsBatch);
if (results.length !== 1) {
throw new Error(`Unexpected scheduling result count ${results.length}`);
} else if ('error' in results[0]) {
const backfillError = results[0].error;
throw new Error(backfillError.message);
} else {
logProcessedAsAuditEvent(context, rule);
}
return true;
};

View file

@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export enum BulkGapsFillStep {
ACCESS_VALIDATION = 'ACCESS_VALIDATION',
SCHEDULING = 'SCHEDULING',
}
export enum BulkFillGapsScheduleResult {
BACKFILLED = 'BACKFILLED',
SKIPPED = 'SKIPPED',
ERRORED = 'ERRORED',
}
export interface BulkGapFillingErroredRule {
rule: {
id: string;
name: string;
};
step: BulkGapsFillStep;
errorMessage: string;
}
interface RuleToBackfill {
id: string;
name: string;
alertTypeId: string;
consumer: string;
}
export interface BulkFillGapsByRuleIdsParams {
rules: RuleToBackfill[];
range: {
start: string;
end: string;
};
}
export interface BulkFillGapsByRuleIdsOptions {
maxGapCountPerRule: number;
maxBackfillConcurrency?: number;
}
export interface BulkFillGapsByRuleIdsResult {
backfilled: RuleToBackfill[];
skipped: RuleToBackfill[];
errored: BulkGapFillingErroredRule[];
}

View file

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { BulkGapsFillStep } from './types';
import type { RulesClientContext } from '../../../../rules_client';
import type { RuleAuditEventParams } from '../../../../rules_client/common/audit_events';
import { ruleAuditEvent, RuleAuditAction } from '../../../../rules_client/common/audit_events';
import { RULE_SAVED_OBJECT_TYPE } from '../../../../saved_objects';
export type BulkGapFillError = ReturnType<typeof toBulkGapFillError>;
export const toBulkGapFillError = (
rule: { id: string; name: string },
step: BulkGapsFillStep,
error: Error
) => {
let fallbackMessage: string;
switch (step) {
case BulkGapsFillStep.SCHEDULING:
fallbackMessage = 'Error scheduling backfills';
break;
case BulkGapsFillStep.ACCESS_VALIDATION:
fallbackMessage = 'Error validating user access to the rule';
break;
}
return {
rule: {
id: rule.id,
name: rule.name,
},
step,
errorMessage: error?.message ?? fallbackMessage,
};
};
export const logProcessedAsAuditEvent = (
context: RulesClientContext,
{ id, name }: { id: string; name: string },
error?: Error
) => {
const payload: RuleAuditEventParams = {
action: RuleAuditAction.FILL_GAPS,
savedObject: { type: RULE_SAVED_OBJECT_TYPE, id, name },
};
if (error) {
payload.error = error;
}
context.auditLogger?.log(ruleAuditEvent(payload));
};

View file

@ -48,6 +48,7 @@ import { createBackfillError } from './lib';
import { updateGaps } from '../lib/rule_gaps/update/update_gaps';
import { denormalizeActions } from '../rules_client/lib/denormalize_actions';
import type { DenormalizedAction, NormalizedAlertActionWithGeneratedValues } from '../rules_client';
import type { Gap } from '../lib/rule_gaps/gap';
export const BACKFILL_TASK_TYPE = 'ad_hoc_run-backfill';
@ -69,6 +70,7 @@ interface BulkQueueOpts {
eventLogClient: IEventLogClient;
internalSavedObjectsRepository: ISavedObjectsRepository;
eventLogger: IEventLogger | undefined;
gaps?: Gap[];
}
interface DeleteBackfillForRulesOpts {
@ -106,6 +108,7 @@ export class BackfillClient {
eventLogClient,
internalSavedObjectsRepository,
eventLogger,
gaps,
}: BulkQueueOpts): Promise<ScheduleBackfillResults> {
const adHocSOsToCreate: Array<SavedObjectsBulkCreateObject<AdHocRunSO>> = [];
@ -297,6 +300,7 @@ export class BackfillClient {
logger: this.logger,
backfillClient: this,
actionsClient,
gaps,
})
)
);

View file

@ -15,6 +15,7 @@ import {
normalizeInterval,
denormalizeInterval,
clipInterval,
clampIntervals,
clipDateInterval,
} from './interval_utils';
import type { Interval, StringInterval } from '../types';
@ -262,6 +263,99 @@ describe('interval_utils', () => {
});
});
describe('clampIntervals', () => {
const toInterval = (start: string, end: string) => ({
gte: new Date(start),
lte: new Date(end),
});
const buildTestCase = (
testDescription: string,
intervals: Interval[],
boundary: Interval,
expectedResult: Interval[]
) => ({
testDescription,
intervals,
boundary,
expectedResult,
});
const testCases = [
buildTestCase(
'when the list of intervals is outside the range on the left, it should return an empty list',
[
toInterval('2025-05-09T09:12:09.457Z', '2025-05-09T09:13:09.457Z'),
toInterval('2025-05-09T09:14:09.457Z', '2025-05-09T09:15:09.457Z'),
],
toInterval('2025-05-09T09:15:09.457Z', '2025-05-09T09:17:09.457Z'),
[]
),
buildTestCase(
'when the list of intervals overlaps on the left, it should clamp the overlapping interval on the start',
[
toInterval('2025-05-09T09:12:09.457Z', '2025-05-09T09:13:09.457Z'),
toInterval('2025-05-09T09:14:09.457Z', '2025-05-09T09:15:09.457Z'),
],
toInterval('2025-05-09T09:14:50.457Z', '2025-05-09T09:17:09.457Z'),
[toInterval('2025-05-09T09:14:50.457Z', '2025-05-09T09:15:09.457Z')]
),
buildTestCase(
'when the list of intervals overlaps with the range on both left and right, it should clamp the overlapping intervals',
[
toInterval('2025-05-09T09:12:09.457Z', '2025-05-09T09:13:09.457Z'),
toInterval('2025-05-09T09:15:09.457Z', '2025-05-09T09:16:09.457Z'),
toInterval('2025-05-09T09:17:09.458Z', '2025-05-09T09:20:09.457Z'),
toInterval('2025-05-09T09:21:09.458Z', '2025-05-09T09:22:09.457Z'),
],
toInterval('2025-05-09T09:15:50.457Z', '2025-05-09T09:18:09.457Z'),
[
toInterval('2025-05-09T09:15:50.457Z', '2025-05-09T09:16:09.457Z'),
toInterval('2025-05-09T09:17:09.458Z', '2025-05-09T09:18:09.457Z'),
]
),
buildTestCase(
'when the list of intervals is included inside the range, it should not clamp anything',
[
toInterval('2025-05-09T09:12:09.457Z', '2025-05-09T09:13:09.457Z'),
toInterval('2025-05-09T09:14:09.457Z', '2025-05-09T09:15:09.457Z'),
],
toInterval('2025-05-09T08:11:50.457Z', '2025-05-09T09:17:09.457Z'),
[
toInterval('2025-05-09T09:12:09.457Z', '2025-05-09T09:13:09.457Z'),
toInterval('2025-05-09T09:14:09.457Z', '2025-05-09T09:15:09.457Z'),
]
),
buildTestCase(
'when the list of intervals overlaps on the right, it should clamp the overlapping interval on the end',
[
toInterval('2025-05-09T09:12:09.457Z', '2025-05-09T09:13:09.457Z'),
toInterval('2025-05-09T09:14:09.457Z', '2025-05-09T09:15:09.457Z'),
],
toInterval('2025-05-09T09:11:50.457Z', '2025-05-09T09:14:55.457Z'),
[
toInterval('2025-05-09T09:12:09.457Z', '2025-05-09T09:13:09.457Z'),
toInterval('2025-05-09T09:14:09.457Z', '2025-05-09T09:14:55.457Z'),
]
),
buildTestCase(
'when the list of intervals is outside the range on the right, it should return an empty list',
[
toInterval('2025-05-09T09:18:09.457Z', '2025-05-09T09:20:09.457Z'),
toInterval('2025-05-09T09:21:09.457Z', '2025-05-09T09:22:09.457Z'),
],
toInterval('2025-05-09T09:15:09.457Z', '2025-05-09T09:17:09.457Z'),
[]
),
];
testCases.forEach(({ testDescription, intervals, boundary, expectedResult }) => {
it(testDescription, () => {
expect(clampIntervals(intervals, boundary)).toEqual(expectedResult);
});
});
});
describe('clipDateInterval', () => {
it('clips date interval to boundary', () => {
const expected = {

View file

@ -236,3 +236,46 @@ export const clipDateInterval = (
return { start: new Date(clippedStart), end: new Date(clippedEnd) };
};
/**
* Clamps the list `intervals` (in ascending order) to the dates provided in the `range`
* @param intervals
* @param boundary
* @returns
*/
export const clampIntervals = (intervals: Interval[], boundary: Interval) => {
const clampedIntervals = [];
const { gte: start, lte: end } = boundary;
const boundaryStartTime = start.getTime();
const boundaryEndTime = end.getTime();
for (const { gte, lte } of intervals) {
const intervalStart = gte.getTime();
const intervalEnd = lte.getTime();
// If the interval ends before the range starts, skip it
if (intervalEnd < boundaryStartTime) {
continue;
}
// If the interval starts after the range ends, stop processing (since the list is sorted)
if (intervalStart > boundaryEndTime) {
break;
}
const clamped = {
gte: new Date(intervalStart < boundaryStartTime ? boundaryStartTime : intervalStart),
lte: new Date(intervalEnd > boundaryEndTime ? boundaryEndTime : intervalEnd),
};
// Atter clamping the intervals the limits cannot be the same
if (clamped.gte.getTime() >= clamped.lte.getTime()) {
continue;
}
// Clamp the interval to the range
clampedIntervals.push(clamped);
}
return clampedIntervals;
};

View file

@ -0,0 +1,324 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { loggerMock } from '@kbn/logging-mocks';
import { findGapsSearchAfter } from './find_gaps';
import { processAllGapsInTimeRange } from './process_all_gaps_in_time_range';
import { eventLogClientMock } from '@kbn/event-log-plugin/server/event_log_client.mock';
import type { Gap } from './gap';
import { gapStatus } from '../../../common/constants';
jest.mock('./find_gaps', () => {
return {
findGapsSearchAfter: jest.fn(),
};
});
const findGapsSearchAfterMock = findGapsSearchAfter as jest.Mock;
const range = (rangeStart: string, rangeEnd: string) => ({
gte: new Date(rangeStart),
lte: new Date(rangeEnd),
});
const createGap = (unfilledIntervals: Array<ReturnType<typeof range>>): Gap => {
return {
unfilledIntervals,
} as unknown as Gap;
};
const generateTestCaseData = (iterations: number) => {
const pageSize = 500;
const oneMinuteMs = 60 * 1000;
const endDate = new Date();
const startDate = new Date(endDate.getTime() - iterations * pageSize * oneMinuteMs);
let nextDate = startDate;
const getDateRange = () => {
const date = nextDate.toISOString();
nextDate = new Date(nextDate.getTime() + oneMinuteMs);
return range(date, nextDate.toISOString());
};
const findGapsSearchReturnValues = Array.from({ length: iterations }).map((_, idx) => {
return {
data: Array.from({ length: 500 }).map((__) => createGap([getDateRange()])),
searchAfter: idx + 1 === iterations ? null : `some-search-after-${idx}`,
pitId: `pitd-${idx}`,
};
});
return {
findGapsSearchReturnValues,
searchRange: {
start: startDate.toISOString(),
end: endDate.toISOString(),
},
};
};
describe('processAllGapsInTimeRange', () => {
const ruleId = 'some-rule-id';
const mockLogger = loggerMock.create();
const mockEventLogClient = eventLogClientMock.create();
let processGapsBatchMock: jest.Mock;
afterEach(() => {
jest.resetAllMocks();
});
beforeEach(() => {
let processGapsBatchCall = 1;
processGapsBatchMock = jest.fn(async () => processGapsBatchCall++);
});
describe('happy path', () => {
const { findGapsSearchReturnValues, searchRange } = generateTestCaseData(3);
const { start, end } = searchRange;
let results: Awaited<ReturnType<typeof processAllGapsInTimeRange>>;
beforeEach(async () => {
findGapsSearchReturnValues.forEach((returnValue) =>
findGapsSearchAfterMock.mockResolvedValueOnce(returnValue)
);
results = await processAllGapsInTimeRange({
ruleId,
start,
end,
logger: mockLogger,
eventLogClient: mockEventLogClient,
processGapsBatch: processGapsBatchMock,
});
});
it('should fetch all the gaps for the rule', () => {
const callParams = [
{ pitId: undefined, searchAfter: undefined },
{
pitId: findGapsSearchReturnValues[0].pitId,
searchAfter: findGapsSearchReturnValues[0].searchAfter,
},
{
pitId: findGapsSearchReturnValues[1].pitId,
searchAfter: findGapsSearchReturnValues[1].searchAfter,
},
];
expect(findGapsSearchAfterMock).toHaveBeenCalledTimes(callParams.length);
callParams.forEach(({ pitId, searchAfter }, idx) => {
const callOrder = idx + 1;
expect(findGapsSearchAfterMock).toHaveBeenNthCalledWith(callOrder, {
eventLogClient: mockEventLogClient,
logger: mockLogger,
params: {
ruleId,
start,
end,
perPage: 500,
statuses: [gapStatus.PARTIALLY_FILLED, gapStatus.UNFILLED],
sortField: '@timestamp',
sortOrder: 'asc',
searchAfter,
pitId,
},
});
});
});
it('should call closePointInTime when it is done', () => {
expect(mockEventLogClient.closePointInTime).toHaveBeenCalledTimes(1);
});
it('should call the processing gaps function passed as parameter', () => {
expect(processGapsBatchMock).toHaveBeenCalledTimes(findGapsSearchReturnValues.length);
findGapsSearchReturnValues.forEach(({ data }, idx) => {
const callOrder = idx + 1;
expect(processGapsBatchMock).toHaveBeenNthCalledWith(callOrder, data);
});
});
it('should return a list with the results of each call to processGapsBatch', () => {
// In this test we make processGapsBatch to return a number representing how many times it was called
expect(results).toEqual(findGapsSearchReturnValues.map((_, idx) => idx + 1));
});
});
describe('when there are no gaps to process', () => {
let results: Awaited<ReturnType<typeof processAllGapsInTimeRange>>;
beforeEach(async () => {
findGapsSearchAfterMock.mockResolvedValue({
data: [],
searchAfer: null,
pitId: null,
});
results = await processAllGapsInTimeRange({
ruleId,
start: new Date().toISOString(),
end: new Date().toISOString(),
logger: mockLogger,
eventLogClient: mockEventLogClient,
processGapsBatch: processGapsBatchMock,
});
});
it('should not call processGapsBatch', () => {
expect(processGapsBatchMock).not.toHaveBeenCalled();
});
it('should return an empty results array', () => {
expect(results).toEqual([]);
});
});
describe('when the max iterations are reached', () => {
const { findGapsSearchReturnValues, searchRange } = generateTestCaseData(10001);
const { start, end } = searchRange;
let results: Awaited<ReturnType<typeof processAllGapsInTimeRange>>;
beforeEach(async () => {
findGapsSearchAfterMock.mockImplementation(() => {
return {
data: [],
searchAfter: null,
pitId: null,
};
});
findGapsSearchReturnValues.forEach((returnValue) =>
findGapsSearchAfterMock.mockResolvedValueOnce(returnValue)
);
results = await processAllGapsInTimeRange({
ruleId,
start,
end,
logger: mockLogger,
eventLogClient: mockEventLogClient,
processGapsBatch: processGapsBatchMock,
});
});
it('should stop fetching gaps when the max number of iterations is reached', () => {
expect(findGapsSearchAfterMock).toHaveBeenCalledTimes(10000);
});
it('should call closePointInTime when it is done', () => {
expect(mockEventLogClient.closePointInTime).toHaveBeenCalledTimes(1);
});
it('should return a list with the results of each call to processGapsBatch', () => {
// In this test we make processGapsBatch to return a number representing how many times it was called
expect(results).toEqual(findGapsSearchReturnValues.slice(0, 10000).map((_, idx) => idx + 1));
});
});
describe('when the max amount of gaps specified in the params is reached', () => {
const { findGapsSearchReturnValues, searchRange } = generateTestCaseData(3);
const { start, end } = searchRange;
let processedGapsCount = 0;
beforeEach(async () => {
processGapsBatchMock.mockImplementation(async (gaps) => {
processedGapsCount += gaps.length;
});
findGapsSearchReturnValues.forEach((returnValue) =>
findGapsSearchAfterMock.mockResolvedValueOnce(returnValue)
);
await processAllGapsInTimeRange({
ruleId,
start,
end,
logger: mockLogger,
options: {
maxFetchedGaps: 550,
},
eventLogClient: mockEventLogClient,
processGapsBatch: processGapsBatchMock,
});
});
afterEach(() => {
processedGapsCount = 0;
});
it('should stop fetching gaps when the max number of gaps is reached', () => {
expect(findGapsSearchAfterMock).toHaveBeenCalledTimes(2);
});
it('should only process the amount of gaps specified', () => {
expect(processedGapsCount).toEqual(550);
});
it('should call closePointInTime when it is done', () => {
expect(mockEventLogClient.closePointInTime).toHaveBeenCalledTimes(1);
});
});
describe('when nextSearchAfter is null', () => {
const { findGapsSearchReturnValues, searchRange } = generateTestCaseData(2);
// Make it stop after the first result
findGapsSearchReturnValues[0].searchAfter = null;
const { start, end } = searchRange;
let processedGapsCount = 0;
beforeEach(async () => {
processGapsBatchMock.mockImplementation(async (gaps) => {
processedGapsCount += gaps.length;
});
findGapsSearchAfterMock.mockImplementation(() => {
return {
data: [],
searchAfter: null,
pitId: null,
};
});
findGapsSearchReturnValues.forEach((returnValue) =>
findGapsSearchAfterMock.mockResolvedValueOnce(returnValue)
);
await processAllGapsInTimeRange({
ruleId,
start,
end,
logger: mockLogger,
eventLogClient: mockEventLogClient,
processGapsBatch: processGapsBatchMock,
});
});
it('should stop fetching gaps', () => {
expect(findGapsSearchAfterMock).toHaveBeenCalledTimes(1);
});
it('should call closePointInTime when it is done', () => {
expect(mockEventLogClient.closePointInTime).toHaveBeenCalledTimes(1);
});
});
describe('when there is an error', () => {
const thrownError = new Error('boom!');
let caughtError: Error;
const { findGapsSearchReturnValues, searchRange } = generateTestCaseData(2);
const { start, end } = searchRange;
beforeEach(async () => {
findGapsSearchAfterMock.mockResolvedValueOnce(findGapsSearchReturnValues[0]);
findGapsSearchAfterMock.mockRejectedValue(thrownError);
try {
await processAllGapsInTimeRange({
ruleId,
start,
end,
logger: mockLogger,
eventLogClient: mockEventLogClient,
processGapsBatch: processGapsBatchMock,
});
} catch (error) {
caughtError = error;
}
});
it('should propagate the error', () => {
expect(caughtError).toBe(thrownError);
});
it('should call closePointInTime when it is done', () => {
expect(mockEventLogClient.closePointInTime).toHaveBeenCalledTimes(1);
});
});
});

View file

@ -0,0 +1,112 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Logger } from '@kbn/core/server';
import type { IEventLogClient } from '@kbn/event-log-plugin/server';
import type { SortResults } from '@elastic/elasticsearch/lib/api/types';
import { findGapsSearchAfter } from './find_gaps';
import type { Gap } from './gap';
import type { GapStatus } from '../../../common/constants';
import { gapStatus } from '../../../common/constants';
interface ProcessAllGapsInTimeRangeParams<T> {
ruleId: string;
start: string;
end: string;
statuses?: GapStatus[];
options?: {
maxFetchedGaps?: number;
};
eventLogClient: IEventLogClient;
logger: Logger;
processGapsBatch: (gaps: Gap[]) => Promise<T>;
}
export const PROCESS_GAPS_DEFAULT_PAGE_SIZE = 500;
// Circuit breaker to prevent infinite loops
// It should be enough to update 5,000,000 gaps
// 10000 * 500 = 5,000,000 million gaps
const DEFAULT_MAX_ITERATIONS = 10000;
/**
* Fetches all gaps using search_after pagination to process more than 10,000 gaps with stable sorting
*/
export const processAllGapsInTimeRange = async <T>({
ruleId,
start,
end,
statuses = [gapStatus.PARTIALLY_FILLED, gapStatus.UNFILLED],
options,
logger,
eventLogClient,
processGapsBatch,
}: ProcessAllGapsInTimeRangeParams<T>): Promise<T[]> => {
let searchAfter: SortResults[] | undefined;
let pitId: string | undefined;
let iterationCount = 0;
let gapsCount = 0;
const processingResults: T[] = [];
const { maxFetchedGaps } = options ?? {};
try {
while (true) {
if (iterationCount >= DEFAULT_MAX_ITERATIONS) {
logger.warn(
`Circuit breaker triggered: Reached maximum number of iterations (${DEFAULT_MAX_ITERATIONS}) while processing gaps for rule ${ruleId}`
);
break;
}
iterationCount++;
const gapsResponse = await findGapsSearchAfter({
eventLogClient,
logger,
params: {
ruleId,
start,
end,
perPage: PROCESS_GAPS_DEFAULT_PAGE_SIZE,
statuses,
sortField: '@timestamp',
sortOrder: 'asc',
searchAfter,
pitId,
},
});
const { data: gaps, searchAfter: nextSearchAfter, pitId: nextPitId } = gapsResponse;
pitId = nextPitId;
gapsCount += gaps.length;
let gapsToProcess = gaps;
if (maxFetchedGaps && gapsCount > maxFetchedGaps) {
const offset = gapsCount - maxFetchedGaps;
gapsToProcess = gapsToProcess.slice(0, gaps.length - offset);
}
if (gapsToProcess.length > 0) {
processingResults.push(await processGapsBatch(gapsToProcess));
}
// Exit conditions: no more results or no next search_after or maxFetchedGaps reached
const maxGapsReached = maxFetchedGaps !== undefined && gapsCount >= maxFetchedGaps;
if (gapsToProcess.length === 0 || !nextSearchAfter || maxGapsReached) {
break;
}
searchAfter = nextSearchAfter;
}
} finally {
if (pitId) {
await eventLogClient.closePointInTime(pitId);
}
}
return processingResults;
};

View file

@ -6,7 +6,6 @@
*/
import { updateGaps } from './update_gaps';
import { findGapsSearchAfter } from '../find_gaps';
import { mgetGaps } from '../mget_gaps';
import { updateGapFromSchedule } from './update_gap_from_schedule';
import { calculateGapStateFromAllBackfills } from './calculate_gaps_state';
@ -19,11 +18,13 @@ import { savedObjectsRepositoryMock } from '@kbn/core/server/mocks';
import { Gap } from '../gap';
import { adHocRunStatus } from '../../../../common/constants';
import { actionsClientMock } from '@kbn/actions-plugin/server/mocks';
import { processAllGapsInTimeRange } from '../process_all_gaps_in_time_range';
jest.mock('../find_gaps');
jest.mock('../mget_gaps');
jest.mock('./update_gap_from_schedule');
jest.mock('./calculate_gaps_state');
jest.mock('../process_all_gaps_in_time_range');
describe('updateGaps', () => {
const mockLogger = loggerMock.create();
@ -33,6 +34,9 @@ describe('updateGaps', () => {
const mockBackfillClient = backfillClientMock.create();
const mockActionsClient = actionsClientMock.create();
const processAllGapsInTimeRangeMock = processAllGapsInTimeRange as jest.Mock;
const mgetGapsMock = mgetGaps as jest.Mock;
const createTestGap = () =>
new Gap({
range: {
@ -49,22 +53,17 @@ describe('updateGaps', () => {
beforeEach(() => {
jest.resetAllMocks();
(findGapsSearchAfter as jest.Mock).mockResolvedValue({
data: [],
total: 0,
pitId: 'test-pit-id',
searchAfter: undefined,
processAllGapsInTimeRangeMock.mockImplementation(({ processGapsBatch }) => {
return processGapsBatch([]);
});
(mgetGaps as jest.Mock).mockResolvedValue([]);
mgetGapsMock.mockResolvedValue([]);
});
describe('updateGaps', () => {
it('should orchestrate the gap update process', async () => {
const testGap = createTestGap();
(findGapsSearchAfter as jest.Mock).mockResolvedValue({
data: [testGap],
pitId: 'test-pit-id',
searchAfter: undefined,
processAllGapsInTimeRangeMock.mockImplementationOnce((callback) => {
return callback([testGap]);
});
await updateGaps({
@ -79,97 +78,37 @@ describe('updateGaps', () => {
actionsClient: mockActionsClient,
});
expect(findGapsSearchAfter).toHaveBeenCalledWith({
expect(processAllGapsInTimeRangeMock).toHaveBeenCalledWith({
eventLogClient: mockEventLogClient,
logger: mockLogger,
params: {
ruleId: 'test-rule-id',
start: '2024-01-01T00:00:00.000Z',
end: '2024-01-01T01:00:00.000Z',
perPage: 1000,
statuses: ['partially_filled', 'unfilled'],
sortField: '@timestamp',
sortOrder: 'asc',
searchAfter: undefined,
pitId: undefined,
},
ruleId: 'test-rule-id',
start: '2024-01-01T00:00:00.000Z',
end: '2024-01-01T01:00:00.000Z',
processGapsBatch: expect.any(Function),
});
expect(mockEventLogger.updateEvents).toHaveBeenCalled();
expect(mockEventLogClient.closePointInTime).toHaveBeenCalledWith('test-pit-id');
});
it('should handle pagination with search_after', async () => {
const gaps = [
createTestGap(),
new Gap({
range: {
gte: '2024-01-01T01:00:00.000Z',
lte: '2024-01-01T02:00:00.000Z',
},
internalFields: {
_id: 'test-id-2',
_index: 'test-index',
_seq_no: 2,
_primary_term: 1,
},
}),
];
// Mock first page with search_after
const firstPageGaps = Array(500).fill(gaps[0]);
const secondPageGaps = [gaps[1]];
(findGapsSearchAfter as jest.Mock)
.mockResolvedValueOnce({
data: firstPageGaps,
pitId: 'test-pit-id',
searchAfter: ['2024-01-01T01:00:00.000Z'],
})
.mockResolvedValueOnce({
data: secondPageGaps,
pitId: 'test-pit-id',
searchAfter: undefined,
});
it('should skip fetching gaps when they are passed in as a param', async () => {
const testGap = createTestGap();
await updateGaps({
ruleId: 'test-rule-id',
start: new Date('2024-01-01T00:00:00.000Z'),
end: new Date('2024-01-01T02:00:00.000Z'),
end: new Date('2024-01-01T01:00:00.000Z'),
eventLogger: mockEventLogger,
eventLogClient: mockEventLogClient,
logger: mockLogger,
savedObjectsRepository: mockSavedObjectsRepository,
backfillClient: mockBackfillClient,
actionsClient: mockActionsClient,
gaps: [testGap],
});
expect(findGapsSearchAfter).toHaveBeenCalledTimes(2);
expect(findGapsSearchAfter).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
params: expect.objectContaining({
searchAfter: undefined,
pitId: undefined,
}),
})
);
expect(findGapsSearchAfter).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
params: expect.objectContaining({
searchAfter: ['2024-01-01T01:00:00.000Z'],
pitId: 'test-pit-id',
}),
})
);
expect(mockEventLogger.updateEvents).toHaveBeenCalledTimes(2);
expect(mockEventLogClient.closePointInTime).toHaveBeenCalledWith('test-pit-id');
expect(processAllGapsInTimeRangeMock).not.toHaveBeenCalled();
});
});
describe('error handling', () => {
it('should handle findGaps errors', async () => {
(findGapsSearchAfter as jest.Mock).mockRejectedValue(new Error('Find gaps failed'));
processAllGapsInTimeRangeMock.mockRejectedValue(new Error('Find gaps failed'));
await updateGaps({
ruleId: 'test-rule-id',
@ -191,13 +130,10 @@ describe('updateGaps', () => {
it('should retry on conflict errors and refetch gap', async () => {
const testGap = createTestGap();
const updatedGap = createTestGap();
(findGapsSearchAfter as jest.Mock).mockResolvedValue({
data: [testGap],
total: 1,
pitId: 'test-pit-id',
searchAfter: undefined,
processAllGapsInTimeRangeMock.mockImplementation(({ processGapsBatch }) => {
return processGapsBatch([testGap]);
});
(mgetGaps as jest.Mock).mockResolvedValue([updatedGap]);
mgetGapsMock.mockResolvedValue([updatedGap]);
if (!testGap.internalFields?._id) {
throw new Error('Test gap should have internalFields._id');
@ -243,13 +179,10 @@ describe('updateGaps', () => {
it('should stop retrying after max attempts', async () => {
const testGap = createTestGap();
const updatedGap = createTestGap();
(findGapsSearchAfter as jest.Mock).mockResolvedValue({
data: [testGap],
total: 1,
pitId: 'test-pit-id',
searchAfter: undefined,
processAllGapsInTimeRangeMock.mockImplementation(({ processGapsBatch }) => {
return processGapsBatch([testGap]);
});
(mgetGaps as jest.Mock).mockResolvedValue([updatedGap]);
mgetGapsMock.mockResolvedValue([updatedGap]);
if (!testGap.internalFields?._id) {
throw new Error('Test gap should have internalFields._id');
@ -291,11 +224,8 @@ describe('updateGaps', () => {
it('should handle direct schedule updates', async () => {
const testGap = createTestGap();
(findGapsSearchAfter as jest.Mock).mockResolvedValue({
data: [testGap],
total: 1,
pitId: 'test-pit-id',
searchAfter: undefined,
processAllGapsInTimeRangeMock.mockImplementation(({ processGapsBatch }) => {
return processGapsBatch([testGap]);
});
await updateGaps({
@ -332,11 +262,8 @@ describe('updateGaps', () => {
it('should handle invalid scheduled items', async () => {
const testGap = createTestGap();
(findGapsSearchAfter as jest.Mock).mockResolvedValue({
data: [testGap],
total: 1,
pitId: 'test-pit-id',
searchAfter: undefined,
processAllGapsInTimeRangeMock.mockImplementation(({ processGapsBatch }) => {
return processGapsBatch([testGap]);
});
await updateGaps({
@ -391,11 +318,8 @@ describe('updateGaps', () => {
it('should trigger refetch when shouldRefetchAllBackfills is true', async () => {
const testGap = createTestGap();
(findGapsSearchAfter as jest.Mock).mockResolvedValue({
data: [testGap],
total: 1,
pitId: 'test-pit-id',
searchAfter: undefined,
processAllGapsInTimeRangeMock.mockImplementation(({ processGapsBatch }) => {
return processGapsBatch([testGap]);
});
await updateGaps({

View file

@ -7,19 +7,21 @@
import type { Logger, ISavedObjectsRepository } from '@kbn/core/server';
import type { IEventLogClient, IEventLogger } from '@kbn/event-log-plugin/server';
import type { SortResults } from '@elastic/elasticsearch/lib/api/types';
import type { ActionsClient } from '@kbn/actions-plugin/server';
import { chunk } from 'lodash';
import { withSpan } from '@kbn/apm-utils';
import type { BackfillClient } from '../../../backfill_client/backfill_client';
import { AlertingEventLogger } from '../../alerting_event_logger/alerting_event_logger';
import { findGapsSearchAfter } from '../find_gaps';
import type { Gap } from '../gap';
import { gapStatus } from '../../../../common/constants';
import type { BackfillSchedule } from '../../../application/backfill/result/types';
import { adHocRunStatus } from '../../../../common/constants';
import { calculateGapStateFromAllBackfills } from './calculate_gaps_state';
import { updateGapFromSchedule } from './update_gap_from_schedule';
import { mgetGaps } from '../mget_gaps';
import {
PROCESS_GAPS_DEFAULT_PAGE_SIZE,
processAllGapsInTimeRange,
} from '../process_all_gaps_in_time_range';
import type { ScheduledItem } from './utils';
import { findOverlappingIntervals, toScheduledItem } from './utils';
@ -35,11 +37,11 @@ interface UpdateGapsParams {
shouldRefetchAllBackfills?: boolean;
backfillClient: BackfillClient;
actionsClient: ActionsClient;
gaps?: Gap[];
}
const CONFLICT_STATUS_CODE = 409;
const MAX_RETRIES = 3;
const PAGE_SIZE = 1000;
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
export const prepareGapForUpdate = async (
@ -68,15 +70,10 @@ export const prepareGapForUpdate = async (
);
if (scheduledItems.length > 0 && !hasFailedBackfillTask) {
await withSpan(
{ name: 'updateGaps.prepareGapForUpdate.updateGapFromSchedule', type: 'rules' },
async () => {
updateGapFromSchedule({
gap,
scheduledItems,
});
}
);
updateGapFromSchedule({
gap,
scheduledItems,
});
}
if (hasFailedBackfillTask || scheduledItems.length === 0 || shouldRefetchAllBackfills) {
@ -227,10 +224,10 @@ const updateGapBatch = async (
/**
* Update gaps for a given rule
* Using search_after pagination to process more than 10,000 gaps with stable sorting
* Prepare gaps for update
* Update them in bulk
* If there are conflicts, retry the failed gaps
* If gaps are passed in, it skips fetching and process them instead
*/
export const updateGaps = async (params: UpdateGapsParams) => {
const {
@ -245,6 +242,7 @@ export const updateGaps = async (params: UpdateGapsParams) => {
shouldRefetchAllBackfills,
backfillClient,
actionsClient,
gaps,
} = params;
if (!eventLogger) {
@ -254,72 +252,42 @@ export const updateGaps = async (params: UpdateGapsParams) => {
try {
const alertingEventLogger = new AlertingEventLogger(eventLogger);
let hasErrors = false;
let searchAfter: SortResults[] | undefined;
let pitId: string | undefined;
let iterationCount = 0;
// Circuit breaker to prevent infinite loops
// It should be enough to update 50,000,000 gaps
// 100000 * 500 = 50,000,000 millions gaps
const MAX_ITERATIONS = 100000;
try {
while (true) {
if (iterationCount >= MAX_ITERATIONS) {
logger.warn(
`Circuit breaker triggered: Reached maximum number of iterations (${MAX_ITERATIONS}) while updating gaps for rule ${ruleId}`
);
break;
}
iterationCount++;
const gapsResponse = await findGapsSearchAfter({
eventLogClient,
const processGapsBatch = async (fetchedGaps: Gap[]) => {
if (fetchedGaps.length > 0) {
const success = await updateGapBatch(fetchedGaps, {
backfillSchedule,
savedObjectsRepository,
shouldRefetchAllBackfills,
backfillClient,
actionsClient,
alertingEventLogger,
logger,
params: {
ruleId,
start: start.toISOString(),
end: end.toISOString(),
perPage: PAGE_SIZE,
statuses: [gapStatus.PARTIALLY_FILLED, gapStatus.UNFILLED],
sortField: '@timestamp',
sortOrder: 'asc',
searchAfter,
pitId,
},
ruleId,
eventLogClient,
});
const { data: gaps, searchAfter: nextSearchAfter, pitId: nextPitId } = gapsResponse;
pitId = nextPitId;
if (gaps.length > 0) {
const success = await updateGapBatch(gaps, {
backfillSchedule,
savedObjectsRepository,
shouldRefetchAllBackfills,
backfillClient,
actionsClient,
alertingEventLogger,
logger,
ruleId,
eventLogClient,
});
if (!success) {
hasErrors = true;
}
if (!success) {
hasErrors = true;
}
// Exit conditions: no more results or no next search_after
if (gaps.length === 0 || !nextSearchAfter) {
break;
}
searchAfter = nextSearchAfter;
}
} finally {
if (pitId) {
await eventLogClient.closePointInTime(pitId);
};
if (gaps) {
// If the list of gaps were passed into the function, proceed to update them
for (const gapsChunk of chunk(gaps, PROCESS_GAPS_DEFAULT_PAGE_SIZE)) {
await processGapsBatch(gapsChunk);
}
} else {
// Otherwise fetch and update them
await processAllGapsInTimeRange({
ruleId,
start: start.toISOString(),
end: end.toISOString(),
logger,
eventLogClient,
processGapsBatch,
});
}
if (hasErrors) {

View file

@ -62,6 +62,7 @@ const createRulesClientMock = () => {
fillGapById: jest.fn(),
getRuleIdsWithGaps: jest.fn(),
getGapsSummaryByRuleIds: jest.fn(),
bulkFillGapsByRuleIds: jest.fn(),
};
return mocked;
};

View file

@ -9,7 +9,7 @@ import type { KueryNode } from '@kbn/es-query';
import { retryIfBulkEditConflicts } from './retry_if_bulk_edit_conflicts';
import { RETRY_IF_CONFLICTS_ATTEMPTS } from './wait_before_next_retry';
import { loggingSystemMock } from '@kbn/core/server/mocks';
import type { BulkEditSkipReason } from '../../../common/bulk_edit';
import type { BulkEditSkipReason } from '../../../common/bulk_action';
import { RULE_SAVED_OBJECT_TYPE } from '../../saved_objects';
const mockFilter: KueryNode = {

View file

@ -13,7 +13,7 @@ import type {
SavedObjectsBulkUpdateObject,
SavedObjectsUpdateResponse,
} from '@kbn/core/server';
import type { BulkActionSkipResult } from '../../../common/bulk_edit';
import type { BulkEditActionSkipResult } from '../../../common/bulk_action';
import { convertRuleIdsToKueryNode } from '../../lib';
import type { BulkOperationError } from '../types';
import { waitBeforeNextRetry, RETRY_IF_CONFLICTS_ATTEMPTS } from './wait_before_next_retry';
@ -28,14 +28,14 @@ type BulkEditOperation = (filter: KueryNode | null) => Promise<{
rules: Array<SavedObjectsBulkUpdateObject<RawRule>>;
resultSavedObjects: Array<SavedObjectsUpdateResponse<RawRule>>;
errors: BulkOperationError[];
skipped: BulkActionSkipResult[];
skipped: BulkEditActionSkipResult[];
}>;
interface ReturnRetry {
apiKeysToInvalidate: string[];
results: Array<SavedObjectsUpdateResponse<RawRule>>;
errors: BulkOperationError[];
skipped: BulkActionSkipResult[];
skipped: BulkEditActionSkipResult[];
}
/**
@ -61,7 +61,7 @@ export const retryIfBulkEditConflicts = async (
accApiKeysToInvalidate: string[] = [],
accResults: Array<SavedObjectsUpdateResponse<RawRule>> = [],
accErrors: BulkOperationError[] = [],
accSkipped: BulkActionSkipResult[] = []
accSkipped: BulkEditActionSkipResult[] = []
): Promise<ReturnRetry> => {
// run the operation, return if no errors or throw if not a conflict error
try {

View file

@ -89,6 +89,11 @@ import type { GetGapsSummaryByRuleIdsParams } from '../application/rule/methods/
import type { FindGapsParams } from '../lib/rule_gaps/types';
import type { GetGlobalExecutionSummaryParams } from './methods/get_execution_summary';
import { getGlobalExecutionSummaryWithAuth } from './methods/get_execution_summary';
import { bulkFillGapsByRuleIds } from '../application/rule/methods/bulk_fill_gaps_by_rule_ids';
import type {
BulkFillGapsByRuleIdsOptions,
BulkFillGapsByRuleIdsParams,
} from '../application/rule/methods/bulk_fill_gaps_by_rule_ids/types';
export type ConstructorOptions = Omit<
RulesClientContext,
@ -229,6 +234,11 @@ export class RulesClient {
public fillGapById = (params: FillGapByIdParams) => fillGapById(this.context, params);
public bulkFillGapsByRuleIds = (
params: BulkFillGapsByRuleIdsParams,
options: BulkFillGapsByRuleIdsOptions
) => bulkFillGapsByRuleIds(this.context, params, options);
public getRuleIdsWithGaps = (params: GetRuleIdsWithGapsParams) =>
getRuleIdsWithGaps(this.context, params);

View file

@ -34,11 +34,14 @@ import {
export type BulkEditSkipReason = z.infer<typeof BulkEditSkipReason>;
export const BulkEditSkipReason = z.literal('RULE_NOT_MODIFIED');
export type BulkGapsFillingSkipReason = z.infer<typeof BulkGapsFillingSkipReason>;
export const BulkGapsFillingSkipReason = z.literal('NO_GAPS_TO_FILL');
export type BulkActionSkipResult = z.infer<typeof BulkActionSkipResult>;
export const BulkActionSkipResult = z.object({
id: z.string(),
name: z.string().optional(),
skip_reason: BulkEditSkipReason,
skip_reason: z.union([BulkEditSkipReason, BulkGapsFillingSkipReason]),
});
export type RuleDetailsInError = z.infer<typeof RuleDetailsInError>;
@ -56,6 +59,7 @@ export const BulkActionsDryRunErrCode = z.enum([
'ESQL_INDEX_PATTERN',
'MANUAL_RULE_RUN_FEATURE',
'MANUAL_RULE_RUN_DISABLED_RULE',
'RULE_FILL_GAPS_DISABLED_RULE',
]);
export type BulkActionsDryRunErrCodeEnum = typeof BulkActionsDryRunErrCode.enum;
export const BulkActionsDryRunErrCodeEnum = BulkActionsDryRunErrCode.enum;
@ -193,6 +197,26 @@ export const BulkManualRuleRun = BulkActionBase.merge(
})
);
export type BulkManualRuleFillGaps = z.infer<typeof BulkManualRuleFillGaps>;
export const BulkManualRuleFillGaps = BulkActionBase.merge(
z.object({
action: z.literal('fill_gaps'),
/**
* Object that describes applying a manual gap fill action for the specified time range.
*/
fill_gaps: z.object({
/**
* Start date of the manual gap fill
*/
start_date: z.string(),
/**
* End date of the manual gap fill
*/
end_date: z.string(),
}),
})
);
/**
* Defines the maximum interval in which a rules actions are executed.
> info
@ -214,6 +238,7 @@ export const BulkActionType = z.enum([
'duplicate',
'edit',
'run',
'fill_gaps',
]);
export type BulkActionTypeEnum = typeof BulkActionType.enum;
export const BulkActionTypeEnum = BulkActionType.enum;
@ -407,6 +432,7 @@ export const PerformRulesBulkActionRequestBody = z.union([
BulkExportRules,
BulkDuplicateRules,
BulkManualRuleRun,
BulkManualRuleFillGaps,
BulkEditRules,
]);
export type PerformRulesBulkActionRequestBodyInput = z.input<

View file

@ -45,6 +45,7 @@ paths:
- $ref: '#/components/schemas/BulkExportRules'
- $ref: '#/components/schemas/BulkDuplicateRules'
- $ref: '#/components/schemas/BulkManualRuleRun'
- $ref: '#/components/schemas/BulkManualRuleFillGaps'
- $ref: '#/components/schemas/BulkEditRules'
examples:
example01:
@ -359,6 +360,18 @@ paths:
eventAction: trigger
timestamp: 2023-10-31T00:00:00Z
group: default3
example27:
summary: Fill Gaps - Manually trigger the filling of gaps for specified rules
description: The following request triggers the filling of gaps for the specified rule ids and time range
value:
action: 'fill_gaps'
ids:
- '748694f0-6977-4ea5-8384-cd2e39730779'
- '164d0918-f720-4c9f-9f5c-c5122587cf19'
run:
start_date: '2025-03-01T00:00:00.000Z'
end_date: '2025-03-10T23:59:59.999Z'
responses:
200:
description: OK
@ -1006,6 +1019,11 @@ components:
type: string
enum:
- RULE_NOT_MODIFIED
BulkGapsFillingSkipReason:
type: string
enum:
- NO_GAPS_TO_FILL
BulkActionSkipResult:
type: object
@ -1015,7 +1033,9 @@ components:
name:
type: string
skip_reason:
$ref: '#/components/schemas/BulkEditSkipReason'
oneOf:
- $ref: '#/components/schemas/BulkEditSkipReason'
- $ref: '#/components/schemas/BulkGapsFillingSkipReason'
required:
- id
- skip_reason
@ -1040,6 +1060,7 @@ components:
- ESQL_INDEX_PATTERN
- MANUAL_RULE_RUN_FEATURE
- MANUAL_RULE_RUN_DISABLED_RULE
- RULE_FILL_GAPS_DISABLED_RULE
NormalizedRuleError:
type: object
@ -1245,6 +1266,31 @@ components:
required:
- action
- run
BulkManualRuleFillGaps:
allOf:
- $ref: '#/components/schemas/BulkActionBase'
- type: object
properties:
action:
type: string
enum: [fill_gaps]
fill_gaps:
type: object
description: Object that describes applying a manual gap fill action for the specified time range.
properties:
start_date:
type: string
description: Start date of the manual gap fill
end_date:
type: string
description: End date of the manual gap fill
required:
- start_date
- end_date
required:
- action
- fill_gaps
ThrottleForBulkActions:
type: string
@ -1269,6 +1315,7 @@ components:
- duplicate
- edit
- run
- fill_gaps
BulkActionEditType:
type: string

View file

@ -34,7 +34,7 @@ describe('Perform bulk action request schema', () => {
expectParseError(result);
expect(stringifyZodError(result.error)).toMatchInlineSnapshot(
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 4 more"`
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 6 more"`
);
});
@ -46,7 +46,7 @@ describe('Perform bulk action request schema', () => {
expectParseError(result);
expect(stringifyZodError(result.error)).toMatchInlineSnapshot(
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 4 more"`
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 6 more"`
);
});
@ -74,7 +74,7 @@ describe('Perform bulk action request schema', () => {
expectParseError(result);
expect(stringifyZodError(result.error)).toMatchInlineSnapshot(
`"ids: Expected array, received string, action: Invalid literal value, expected \\"delete\\", ids: Expected array, received string, action: Invalid literal value, expected \\"disable\\", ids: Expected array, received string, and 10 more"`
`"ids: Expected array, received string, action: Invalid literal value, expected \\"delete\\", ids: Expected array, received string, action: Invalid literal value, expected \\"disable\\", ids: Expected array, received string, and 13 more"`
);
});
});
@ -154,7 +154,7 @@ describe('Perform bulk action request schema', () => {
expectParseError(result);
expect(stringifyZodError(result.error)).toMatchInlineSnapshot(
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 3 more"`
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 5 more"`
);
});
@ -185,7 +185,7 @@ describe('Perform bulk action request schema', () => {
expectParseError(result);
expect(stringifyZodError(result.error)).toMatchInlineSnapshot(
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 3 more"`
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 5 more"`
);
});
@ -200,7 +200,7 @@ describe('Perform bulk action request schema', () => {
expectParseError(result);
expect(stringifyZodError(result.error)).toMatchInlineSnapshot(
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 3 more"`
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 5 more"`
);
});
});
@ -217,7 +217,7 @@ describe('Perform bulk action request schema', () => {
expectParseError(result);
expect(stringifyZodError(result.error)).toMatchInlineSnapshot(
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 13 more"`
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 15 more"`
);
});
@ -279,7 +279,7 @@ describe('Perform bulk action request schema', () => {
expectParseError(result);
expect(stringifyZodError(result.error)).toMatchInlineSnapshot(
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 13 more"`
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 15 more"`
);
});
@ -397,7 +397,7 @@ describe('Perform bulk action request schema', () => {
expectParseError(result);
expect(stringifyZodError(result.error)).toMatchInlineSnapshot(
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 11 more"`
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 13 more"`
);
});
@ -419,7 +419,7 @@ describe('Perform bulk action request schema', () => {
expectParseError(result);
expect(stringifyZodError(result.error)).toMatchInlineSnapshot(
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 14 more"`
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 16 more"`
);
});
@ -457,7 +457,7 @@ describe('Perform bulk action request schema', () => {
expectParseError(result);
expect(stringifyZodError(result.error)).toMatchInlineSnapshot(
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 11 more"`
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 13 more"`
);
});
@ -502,7 +502,7 @@ describe('Perform bulk action request schema', () => {
expectParseError(result);
expect(stringifyZodError(result.error)).toMatchInlineSnapshot(
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 14 more"`
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 16 more"`
);
});
@ -524,7 +524,7 @@ describe('Perform bulk action request schema', () => {
expectParseError(result);
expect(stringifyZodError(result.error)).toMatchInlineSnapshot(
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 14 more"`
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 16 more"`
);
});
@ -562,7 +562,7 @@ describe('Perform bulk action request schema', () => {
expectParseError(result);
expect(stringifyZodError(result.error)).toMatchInlineSnapshot(
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 11 more"`
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 13 more"`
);
});
@ -584,7 +584,7 @@ describe('Perform bulk action request schema', () => {
expectParseError(result);
expect(stringifyZodError(result.error)).toMatchInlineSnapshot(
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 15 more"`
`"action: Invalid literal value, expected \\"delete\\", action: Invalid literal value, expected \\"disable\\", action: Invalid literal value, expected \\"enable\\", action: Invalid literal value, expected \\"export\\", action: Invalid literal value, expected \\"duplicate\\", and 17 more"`
);
});

View file

@ -2056,6 +2056,21 @@ paths:
timestamp: 2023-10-31T00:00:00.000Z
ids:
- 9e946bfc-3118-4c77-bb25-67d781191921
example27:
description: >-
The following request triggers the filling of gaps for the
specified rule ids and time range
summary: >-
Fill Gaps - Manually trigger the filling of gaps for specified
rules
value:
action: fill_gaps
ids:
- 748694f0-6977-4ea5-8384-cd2e39730779
- 164d0918-f720-4c9f-9f5c-c5122587cf19
run:
end_date: '2025-03-10T23:59:59.999Z'
start_date: '2025-03-01T00:00:00.000Z'
schema:
oneOf:
- $ref: '#/components/schemas/BulkDeleteRules'
@ -2064,6 +2079,7 @@ paths:
- $ref: '#/components/schemas/BulkExportRules'
- $ref: '#/components/schemas/BulkDuplicateRules'
- $ref: '#/components/schemas/BulkManualRuleRun'
- $ref: '#/components/schemas/BulkManualRuleFillGaps'
- $ref: '#/components/schemas/BulkEditRules'
responses:
'200':
@ -4676,6 +4692,7 @@ components:
- ESQL_INDEX_PATTERN
- MANUAL_RULE_RUN_FEATURE
- MANUAL_RULE_RUN_DISABLED_RULE
- RULE_FILL_GAPS_DISABLED_RULE
type: string
BulkActionSkipResult:
type: object
@ -4685,7 +4702,9 @@ components:
name:
type: string
skip_reason:
$ref: '#/components/schemas/BulkEditSkipReason'
oneOf:
- $ref: '#/components/schemas/BulkEditSkipReason'
- $ref: '#/components/schemas/BulkGapsFillingSkipReason'
required:
- id
- skip_reason
@ -4945,6 +4964,52 @@ components:
type: string
required:
- action
BulkGapsFillingSkipReason:
enum:
- NO_GAPS_TO_FILL
type: string
BulkManualRuleFillGaps:
type: object
properties:
action:
enum:
- fill_gaps
type: string
fill_gaps:
description: >-
Object that describes applying a manual gap fill action for the
specified time range.
type: object
properties:
end_date:
description: End date of the manual gap fill
type: string
start_date:
description: Start date of the manual gap fill
type: string
required:
- start_date
- end_date
gaps_range_end:
description: Gaps range end, valid only when query is provided
type: string
gaps_range_start:
description: Gaps range start, valid only when query is provided
type: string
ids:
description: >-
Array of rule IDs. Array of rule IDs to which a bulk action will be
applied. Only valid when query property is undefined.
items:
type: string
minItems: 1
type: array
query:
description: Query to filter rules.
type: string
required:
- action
- fill_gaps
BulkManualRuleRun:
type: object
properties:

View file

@ -1920,6 +1920,21 @@ paths:
timestamp: 2023-10-31T00:00:00.000Z
ids:
- 9e946bfc-3118-4c77-bb25-67d781191921
example27:
description: >-
The following request triggers the filling of gaps for the
specified rule ids and time range
summary: >-
Fill Gaps - Manually trigger the filling of gaps for specified
rules
value:
action: fill_gaps
ids:
- 748694f0-6977-4ea5-8384-cd2e39730779
- 164d0918-f720-4c9f-9f5c-c5122587cf19
run:
end_date: '2025-03-10T23:59:59.999Z'
start_date: '2025-03-01T00:00:00.000Z'
schema:
oneOf:
- $ref: '#/components/schemas/BulkDeleteRules'
@ -1928,6 +1943,7 @@ paths:
- $ref: '#/components/schemas/BulkExportRules'
- $ref: '#/components/schemas/BulkDuplicateRules'
- $ref: '#/components/schemas/BulkManualRuleRun'
- $ref: '#/components/schemas/BulkManualRuleFillGaps'
- $ref: '#/components/schemas/BulkEditRules'
responses:
'200':
@ -4006,6 +4022,7 @@ components:
- ESQL_INDEX_PATTERN
- MANUAL_RULE_RUN_FEATURE
- MANUAL_RULE_RUN_DISABLED_RULE
- RULE_FILL_GAPS_DISABLED_RULE
type: string
BulkActionSkipResult:
type: object
@ -4015,7 +4032,9 @@ components:
name:
type: string
skip_reason:
$ref: '#/components/schemas/BulkEditSkipReason'
oneOf:
- $ref: '#/components/schemas/BulkEditSkipReason'
- $ref: '#/components/schemas/BulkGapsFillingSkipReason'
required:
- id
- skip_reason
@ -4275,6 +4294,52 @@ components:
type: string
required:
- action
BulkGapsFillingSkipReason:
enum:
- NO_GAPS_TO_FILL
type: string
BulkManualRuleFillGaps:
type: object
properties:
action:
enum:
- fill_gaps
type: string
fill_gaps:
description: >-
Object that describes applying a manual gap fill action for the
specified time range.
type: object
properties:
end_date:
description: End date of the manual gap fill
type: string
start_date:
description: Start date of the manual gap fill
type: string
required:
- start_date
- end_date
gaps_range_end:
description: Gaps range end, valid only when query is provided
type: string
gaps_range_start:
description: Gaps range start, valid only when query is provided
type: string
ids:
description: >-
Array of rule IDs. Array of rule IDs to which a bulk action will be
applied. Only valid when query property is undefined.
items:
type: string
minItems: 1
type: array
query:
description: Query to filter rules.
type: string
required:
- action
- fill_gaps
BulkManualRuleRun:
type: object
properties:

View file

@ -1331,6 +1331,44 @@ export const RULES_BULK_MANUAL_RULE_RUN_FAILURE_DESCRIPTION = (failedRulesCount:
}
);
/**
* Bulk fill gaps for rules
*/
export const RULES_BULK_FILL_GAPS_SUCCESS = i18n.translate(
'xpack.securitySolution.detectionEngine.rules.allRules.bulkActions.fillRuleGaps.successToastTitle',
{
defaultMessage: 'Gaps filling for rules scheduled',
}
);
export const RULES_BULK_FILL_GAPS_FAILURE = i18n.translate(
'xpack.securitySolution.detectionEngine.rules.allRules.bulkActions.fillRuleGaps.errorToastTitle',
{
defaultMessage: 'Error scheduling gaps filling for rules',
}
);
export const RULES_BULK_FILL_GAPS_SUCCESS_DESCRIPTION = (totalRules: number) =>
i18n.translate(
'xpack.securitySolution.detectionEngine.rules.allRules.bulkActions.fillRuleGaps.successToastDescription',
{
values: { totalRules },
defaultMessage:
'Successfully scheduled gaps filling for {totalRules, plural, =1 {{totalRules} rule} other {{totalRules} rules}}',
}
);
export const RULES_BULK_FILL_GAPS_FAILURE_DESCRIPTION = (failedRulesCount: number) =>
i18n.translate(
'xpack.securitySolution.detectionEngine.rules.allRules.bulkActions.fillRuleGaps.errorToastDescription',
{
values: { failedRulesCount },
defaultMessage:
'Failed to schedule gaps filling for {failedRulesCount, plural, =0 {} =1 {# rule} other {# rules}}.',
}
);
/**
* Bulk Edit
*/

View file

@ -37,6 +37,9 @@ export function summarizeBulkSuccess(action: BulkActionType): string {
case BulkActionTypeEnum.run:
return i18n.RULES_BULK_MANUAL_RULE_RUN_SUCCESS;
case BulkActionTypeEnum.fill_gaps:
return i18n.RULES_BULK_FILL_GAPS_SUCCESS;
case BulkActionTypeEnum.edit:
return i18n.RULES_BULK_EDIT_SUCCESS;
}
@ -64,6 +67,9 @@ export function explainBulkSuccess(
case BulkActionTypeEnum.run:
return i18n.RULES_BULK_MANUAL_RULE_RUN_SUCCESS_DESCRIPTION(summary.succeeded);
case BulkActionTypeEnum.fill_gaps:
return i18n.RULES_BULK_FILL_GAPS_SUCCESS_DESCRIPTION(summary.succeeded);
}
}
@ -110,6 +116,9 @@ export function summarizeBulkError(action: BulkActionType): string {
case BulkActionTypeEnum.run:
return i18n.RULES_BULK_MANUAL_RULE_RUN_FAILURE;
case BulkActionTypeEnum.fill_gaps:
return i18n.RULES_BULK_FILL_GAPS_FAILURE;
case BulkActionTypeEnum.edit:
return i18n.RULES_BULK_EDIT_FAILURE;
}
@ -142,6 +151,9 @@ export function explainBulkError(action: BulkActionType, error: HTTPError): stri
case BulkActionTypeEnum.run:
return i18n.RULES_BULK_MANUAL_RULE_RUN_FAILURE_DESCRIPTION(summary.failed);
case BulkActionTypeEnum.fill_gaps:
return i18n.RULES_BULK_FILL_GAPS_FAILURE_DESCRIPTION(summary.failed);
case BulkActionTypeEnum.edit:
return i18n.RULES_BULK_EDIT_FAILURE_DESCRIPTION(summary.failed, summary.skipped);
}

View file

@ -142,6 +142,7 @@ export type LoadingRuleAction =
| 'load'
| 'edit'
| 'run'
| 'fill_gaps'
| null;
export interface LoadingRules {

View file

@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { identity, keyBy } from 'lodash';
import type { RulesClient, BulkOperationError } from '@kbn/alerting-plugin/server';
import type { MlAuthz } from '../../../../../machine_learning/authz';
import type { BulkManualRuleFillGaps } from '../../../../../../../common/api/detection_engine';
import type { PromisePoolError } from '../../../../../../utils/promise_pool';
import type { RuleAlertType } from '../../../../rule_schema';
import { validateBulkRuleGapFilling } from '../../../logic/bulk_actions/validations';
interface BuildScheduleRuleGapFillingParams {
rules: RuleAlertType[];
isDryRun?: boolean;
rulesClient: RulesClient;
mlAuthz: MlAuthz;
fillGapsPayload: BulkManualRuleFillGaps['fill_gaps'];
}
interface BulkScheduleBackfillOutcome {
backfilled: RuleAlertType[];
errors: Array<PromisePoolError<RuleAlertType, Error> | BulkOperationError>;
skipped: Array<Pick<RuleAlertType, 'id' | 'name'>>;
}
export const bulkScheduleRuleGapFilling = async ({
rules,
isDryRun,
rulesClient,
mlAuthz,
fillGapsPayload,
}: BuildScheduleRuleGapFillingParams): Promise<BulkScheduleBackfillOutcome> => {
const errors: Array<PromisePoolError<RuleAlertType, Error> | BulkOperationError> = [];
// In the first step, we validate if it is possible to schedule backfill for the rules
const validationResults = await Promise.all(
rules.map(async (rule) => {
try {
await validateBulkRuleGapFilling({
mlAuthz,
rule,
});
return { valid: true, rule };
} catch (error) {
return { valid: false, rule, error };
}
})
);
const validatedRules = validationResults.filter(({ valid }) => valid).map(({ rule }) => rule);
errors.push(
...validationResults
.filter(({ valid }) => !valid)
.map(({ rule, error }) => ({ item: rule, error }))
);
if (isDryRun || validatedRules.length === 0) {
return {
backfilled: validatedRules,
errors,
skipped: [],
};
}
const { start_date: start, end_date: end } = fillGapsPayload;
// Due to performance considerations we will backfill a maximum of 1000 gaps per rule when called with many rules
// however, this endpoint will be called with one rule as well. In that case, we will increase the limit to 10_000
// in order to attempt to fill all the gaps of the rule in the specified time range
const maxGapCountPerRule = rules.length === 1 ? 10_000 : 1000;
const { backfilled, skipped, errored } = await rulesClient.bulkFillGapsByRuleIds(
{
rules: validatedRules.map(({ id, name, consumer, alertTypeId }) => ({
id,
name,
consumer,
alertTypeId,
})),
range: {
start,
end,
},
},
{
maxGapCountPerRule,
}
);
errored.forEach((backfillingError) => {
errors.push({
rule: backfillingError.rule,
message: `${backfillingError.step} - ${backfillingError.errorMessage}`,
});
});
const rulesDict = keyBy(validatedRules, 'id');
return {
backfilled: backfilled.map(({ id }) => rulesDict[id]).filter(identity),
errors,
skipped,
};
};

View file

@ -12,6 +12,7 @@ import type { PromisePoolError } from '../../../../../../utils/promise_pool';
import type { MlAuthz } from '../../../../../machine_learning/authz';
import type { RuleAlertType } from '../../../../rule_schema';
import { validateBulkScheduleBackfill } from '../../../logic/bulk_actions/validations';
import { handleScheduleBackfillResults } from './utils';
interface BulkScheduleBackfillArgs {
rules: RuleAlertType[];
@ -65,32 +66,14 @@ export const bulkScheduleBackfill = async ({
}));
// Perform actual schedule using the rulesClient
const results = await rulesClient.scheduleBackfill(params);
return results.reduce(
(acc, backfillResult) => {
if ('error' in backfillResult) {
const ruleName = validatedRules.find(
(rule) => rule.id === backfillResult.error.rule.id
)?.name;
const backfillError = backfillResult.error;
const backfillRule = backfillError.rule;
const error = {
message: backfillError.message,
status: backfillError.status,
rule: { id: backfillRule.id, name: backfillRule.name ?? ruleName ?? '' },
};
acc.errors.push(error);
} else {
const backfillRule = validatedRules.find((rule) => rule.id === backfillResult.rule.id);
if (backfillRule) {
acc.backfilled.push(backfillRule);
}
}
return acc;
},
{ backfilled: [], errors } as {
backfilled: RuleAlertType[];
errors: Array<PromisePoolError<RuleAlertType, Error> | BulkOperationError>;
}
);
const backfillResults = await rulesClient.scheduleBackfill(params);
const processedResults = handleScheduleBackfillResults({
results: backfillResults,
rules: validatedRules,
});
return {
backfilled: processedResults.backfilled,
errors: [...errors, ...processedResults.errors],
};
};

View file

@ -507,7 +507,7 @@ describe('Perform bulk action route', () => {
});
const result = server.validate(request);
expect(result.badRequest).toHaveBeenCalledWith(
'action: Invalid literal value, expected "delete", action: Invalid literal value, expected "disable", action: Invalid literal value, expected "enable", action: Invalid literal value, expected "export", action: Invalid literal value, expected "duplicate", and 4 more'
'action: Invalid literal value, expected "delete", action: Invalid literal value, expected "disable", action: Invalid literal value, expected "enable", action: Invalid literal value, expected "export", action: Invalid literal value, expected "duplicate", and 6 more'
);
});
@ -519,7 +519,7 @@ describe('Perform bulk action route', () => {
});
const result = server.validate(request);
expect(result.badRequest).toHaveBeenCalledWith(
'action: Invalid literal value, expected "delete", action: Invalid literal value, expected "disable", action: Invalid literal value, expected "enable", action: Invalid literal value, expected "export", action: Invalid literal value, expected "duplicate", and 4 more'
'action: Invalid literal value, expected "delete", action: Invalid literal value, expected "disable", action: Invalid literal value, expected "enable", action: Invalid literal value, expected "export", action: Invalid literal value, expected "duplicate", and 6 more'
);
});
@ -553,7 +553,7 @@ describe('Perform bulk action route', () => {
});
const result = server.validate(request);
expect(result.badRequest).toHaveBeenCalledWith(
'ids: Expected array, received string, action: Invalid literal value, expected "delete", ids: Expected array, received string, ids: Expected array, received string, action: Invalid literal value, expected "enable", and 10 more'
'ids: Expected array, received string, action: Invalid literal value, expected "delete", ids: Expected array, received string, ids: Expected array, received string, action: Invalid literal value, expected "enable", and 13 more'
);
});

View file

@ -43,6 +43,7 @@ import { bulkEnableDisableRules } from './bulk_enable_disable_rules';
import { fetchRulesByQueryOrIds } from './fetch_rules_by_query_or_ids';
import { bulkScheduleBackfill } from './bulk_schedule_rule_run';
import { createPrebuiltRuleAssetsClient } from '../../../../prebuilt_rules/logic/rule_assets/prebuilt_rule_assets_client';
import { bulkScheduleRuleGapFilling } from './bulk_schedule_rule_gap_filling';
const MAX_RULES_TO_PROCESS_TOTAL = 10000;
// Set a lower limit for bulk edit as the rules client might fail with a "Query
@ -390,6 +391,29 @@ export const performBulkActionRoute = (
});
errors.push(...bulkActionErrors);
updated = backfilled.filter((rule): rule is RuleAlertType => rule !== null);
break;
}
case BulkActionTypeEnum.fill_gaps: {
const {
backfilled,
errors: bulkActionErrors,
skipped: skippedRules,
} = await bulkScheduleRuleGapFilling({
rules,
isDryRun,
rulesClient,
mlAuthz,
fillGapsPayload: body.fill_gaps,
});
errors.push(...bulkActionErrors);
updated = backfilled;
skipped = skippedRules.map((rule) => {
return {
...rule,
skip_reason: 'NO_GAPS_TO_FILL',
};
});
}
}

View file

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ScheduleBackfillResults } from '@kbn/alerting-plugin/server/application/backfill/methods/schedule/types';
import type { BulkOperationError } from '@kbn/alerting-plugin/server';
import type { PromisePoolError } from '../../../../../../utils/promise_pool';
import type { RuleAlertType } from '../../../../rule_schema';
interface HandleScheduleBackfillResultsParams {
rules: RuleAlertType[];
results: ScheduleBackfillResults;
}
interface HandleScheduleBackfillResultsOutcome {
backfilled: RuleAlertType[];
errors: Array<PromisePoolError<RuleAlertType, Error> | BulkOperationError>;
}
export const handleScheduleBackfillResults = ({
results,
rules,
}: HandleScheduleBackfillResultsParams): HandleScheduleBackfillResultsOutcome => {
const errors: Array<PromisePoolError<RuleAlertType, Error> | BulkOperationError> = [];
return results.reduce(
(acc, backfillResult) => {
if ('error' in backfillResult) {
const ruleName = rules.find((rule) => rule.id === backfillResult.error.rule.id)?.name;
const backfillError = backfillResult.error;
const backfillRule = backfillError.rule;
const error = {
message: backfillError.message,
status: backfillError.status,
rule: { id: backfillRule.id, name: backfillRule.name ?? ruleName ?? '' },
};
acc.errors.push(error);
} else {
const backfillRule = rules.find((rule) => rule.id === backfillResult.rule.id);
if (backfillRule) {
acc.backfilled.push(backfillRule);
}
}
return acc;
},
{ backfilled: [], errors } as HandleScheduleBackfillResultsOutcome
);
};

View file

@ -75,6 +75,17 @@ export const validateBulkScheduleBackfill = async ({ rule }: BulkActionsValidati
);
};
/**
* runs validation for bulk gap filling for a single rule
* @param params - {@link DryRunRuleFillGapsBulkActionsValidationArgs}
*/
export const validateBulkRuleGapFilling = async ({ rule }: BulkActionsValidationArgs) => {
await throwDryRunError(
() => invariant(rule.enabled, 'Cannot bulk fill gaps for a disabled rule'),
BulkActionsDryRunErrCodeEnum.RULE_FILL_GAPS_DISABLED_RULE
);
};
interface BulkEditBulkActionsValidationArgs {
ruleType: RuleType;
mlAuthz: MlAuthz;

View file

@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type SuperTest from 'supertest';
import { FindGapsResponse } from '@kbn/alerting-plugin/common/routes/gaps/apis/find';
import { routeWithNamespace } from '../route_with_namespace';
export const getGapsByRuleId = async (
supertest: SuperTest.Agent,
ruleId: string,
{ start, end }: { start: string; end: string },
perPage: number,
namespace: string = 'default'
) => {
const response = (await supertest
.post(routeWithNamespace(`/internal/alerting/rules/gaps/_find`, namespace))
.set('kbn-xsrf', 'foo')
.send({
rule_id: ruleId,
start,
end,
per_page: perPage,
})) as FindGapsResponse;
return response.body.data;
};

View file

@ -38,6 +38,9 @@ import { createRule, deleteAllRules } from '../../../../../../common/utils/secur
import { deleteAllExceptions } from '../../../../lists_and_exception_lists/utils';
import { FtrProviderContext } from '../../../../../ftr_provider_context';
import { deleteAllGaps } from '../../../utils/event_log/delete_all_gaps';
import { GapEvent, generateGapsForRule } from '../../../utils/event_log/generate_gaps_for_rule';
import { getGapsByRuleId } from '../../../../../../common/utils/security_solution/detections_response/rules/get_gaps_by_rule_id';
export default ({ getService }: FtrProviderContext): void => {
const supertest = getService('supertest');
@ -2612,6 +2615,300 @@ export default ({ getService }: FtrProviderContext): void => {
});
});
describe('@skipInServerlessMKI fill gaps run action', () => {
const intervalInMinutes = 5;
const interval = `${intervalInMinutes}m`;
const totalRules = 3;
type GapFromEvent = GapEvent['kibana']['alert']['rule']['gap'] & { _id: string };
let generatedGapEvents: Record<
string,
{ rule: { id: string; name: string }; gapEvents: GapFromEvent[] }
>;
const backfillStart = new Date(Date.now() - 89 * 24 * 60 * 60 * 1000);
const backfillEnd = new Date();
const resetEverything = async () => {
await deleteAllGaps(es);
await deleteAllRules(supertest, log);
};
afterEach(resetEverything);
beforeEach(async () => {
generatedGapEvents = {};
for (let idx = 0; idx < totalRules; idx++) {
const rule = await createRule(
supertest,
log,
getCustomQueryRuleParams({
rule_id: idx.toString(),
enabled: true,
interval,
}),
'default'
);
const { gapEvents } = await generateGapsForRule(es, rule, 100);
generatedGapEvents[rule.id] = {
rule,
gapEvents: gapEvents.map((gapEvent) => {
if (!gapEvent._id) {
throw new Error('generated gap event id cannot be undefined');
}
return { ...gapEvent.kibana.alert.rule.gap, _id: gapEvent._id };
}),
};
}
});
it('should trigger the backfilling of the gaps for the rules in the request', async () => {
// Only backfill the first 2 rules
const ruleIdsToBackfill = Object.keys(generatedGapEvents).slice(0, 2);
// Trigger the backfill for the selected rules
const { body } = await securitySolutionApi
.performRulesBulkAction({
query: {},
body: {
ids: ruleIdsToBackfill,
action: BulkActionTypeEnum.fill_gaps,
[BulkActionTypeEnum.fill_gaps]: {
start_date: backfillStart.toISOString(),
end_date: backfillEnd.toISOString(),
},
},
})
.expect(200);
expect(body.success).toEqual(true);
expect(body.attributes.summary).toEqual({
failed: 0,
succeeded: 2,
skipped: 0,
total: 2,
});
const expectedUpdatedRules = Object.values(generatedGapEvents)
.slice(0, 2)
.map((event) => event.rule);
expect(body.attributes.results).toEqual({
updated: expect.arrayContaining(
expectedUpdatedRules.map((expected) => expect.objectContaining(expected))
),
created: [],
deleted: [],
skipped: [],
});
for (const ruleId of ruleIdsToBackfill) {
const fetchedGaps = await getGapsByRuleId(
supertest,
ruleId,
{ start: backfillStart.toISOString(), end: backfillEnd.toISOString() },
100
);
const generatedGaps = generatedGapEvents[ruleId].gapEvents;
// Verify that every single gap is marked as in progress
generatedGaps.forEach((generatedGap) => {
const fetchedGap = fetchedGaps.find(({ _id }) => _id === generatedGap._id);
expect(fetchedGap?.unfilled_intervals).toEqual([]);
expect(fetchedGap?.in_progress_intervals).toEqual(generatedGap.unfilled_intervals);
});
}
// For the rules we didn't backfill, verify that their gaps are still unfilled
for (const ruleId of Object.keys(generatedGapEvents).slice(2)) {
const fetchedGaps = await getGapsByRuleId(
supertest,
ruleId,
{ start: backfillStart.toISOString(), end: backfillEnd.toISOString() },
100
);
const generatedGaps = generatedGapEvents[ruleId].gapEvents;
generatedGaps.forEach((generatedGap) => {
const fetchedGap = fetchedGaps.find(({ _id }) => _id === generatedGap._id);
expect(fetchedGap?.unfilled_intervals).toEqual(generatedGap.unfilled_intervals);
expect(fetchedGap?.in_progress_intervals).toEqual([]);
});
}
});
it('should return 400 error when the end date is not strictly greater than the start date', async () => {
const { body } = await securitySolutionApi
.performRulesBulkAction({
query: {},
body: {
ids: Object.keys(generatedGapEvents),
action: BulkActionTypeEnum.fill_gaps,
[BulkActionTypeEnum.fill_gaps]: {
start_date: backfillStart.toISOString(),
end_date: backfillStart.toISOString(),
},
},
})
.expect(400);
expect(body.message).toContain('Backfill end must be greater than backfill start');
});
it('should return 400 error when start date is in the future', async () => {
const { body } = await securitySolutionApi
.performRulesBulkAction({
query: {},
body: {
ids: Object.keys(generatedGapEvents),
action: BulkActionTypeEnum.fill_gaps,
[BulkActionTypeEnum.fill_gaps]: {
start_date: new Date(Date.now() + 1000).toISOString(),
end_date: backfillEnd.toISOString(),
},
},
})
.expect(400);
expect(body.message).toContain('Backfill cannot be scheduled for the future');
});
it('should return 400 error when end date is in the future', async () => {
const { body } = await securitySolutionApi
.performRulesBulkAction({
query: {},
body: {
ids: Object.keys(generatedGapEvents),
action: BulkActionTypeEnum.fill_gaps,
[BulkActionTypeEnum.fill_gaps]: {
start_date: backfillStart.toISOString(),
end_date: new Date(Date.now() + 5 * 60 * 1000).toISOString(),
},
},
})
.expect(400);
expect(body.message).toContain('Backfill cannot be scheduled for the future');
});
it('should return 400 error when range between start and end are greater than 90 days', async () => {
const { body } = await securitySolutionApi
.performRulesBulkAction({
query: {},
body: {
ids: Object.keys(generatedGapEvents),
action: BulkActionTypeEnum.fill_gaps,
[BulkActionTypeEnum.fill_gaps]: {
start_date: new Date(
backfillEnd.getTime() - 1 - 90 * 24 * 60 * 60 * 1000
).toISOString(),
end_date: backfillEnd.toISOString(),
},
},
})
.expect(400);
expect(body.message).toContain('Backfill cannot look back more than 90 days');
});
it('should return 500 error if some rules do not exist', async () => {
const existentRules = Object.keys(generatedGapEvents);
const nonExistentRule = 'non-existent-rule';
const { body } = await securitySolutionApi
.performRulesBulkAction({
query: {},
body: {
ids: [...existentRules, nonExistentRule],
action: BulkActionTypeEnum.fill_gaps,
[BulkActionTypeEnum.fill_gaps]: {
start_date: backfillStart.toISOString(),
end_date: backfillEnd.toISOString(),
},
},
})
.expect(500);
expect(body.attributes.summary).toEqual({
failed: 1,
skipped: 0,
succeeded: existentRules.length,
total: existentRules.length + 1,
});
expect(body.attributes.errors).toHaveLength(1);
expect(body.attributes.errors[0]).toEqual({
message: 'Rule not found',
status_code: 500,
rules: [
{
id: nonExistentRule,
},
],
});
});
it('should return 500 error if some rules are disabled', async () => {
const enabledRules = Object.keys(generatedGapEvents);
const disabledRule = await createRule(
supertest,
log,
getCustomQueryRuleParams({
rule_id: 'rule-disabled',
enabled: false,
interval,
})
);
await generateGapsForRule(es, disabledRule, 100);
const { body } = await securitySolutionApi
.performRulesBulkAction({
query: {},
body: {
ids: [...enabledRules, disabledRule.id],
action: BulkActionTypeEnum.fill_gaps,
[BulkActionTypeEnum.fill_gaps]: {
start_date: backfillStart.toISOString(),
end_date: backfillEnd.toISOString(),
},
},
})
.expect(500);
expect(body.attributes.summary).toEqual({
failed: 1,
skipped: 0,
succeeded: enabledRules.length,
total: enabledRules.length + 1,
});
expect(body.attributes.errors).toHaveLength(1);
expect(body.attributes.errors).toEqual(
expect.arrayContaining([
{
message: 'Cannot bulk fill gaps for a disabled rule',
status_code: 500,
err_code: 'RULE_FILL_GAPS_DISABLED_RULE',
rules: [{ id: disabledRule.id, name: disabledRule.name }],
},
])
);
const expectedUpdatedRules = Object.values(generatedGapEvents).map((event) => event.rule);
expect(body.attributes.results).toEqual({
updated: expect.arrayContaining(
expectedUpdatedRules.map((expected) => expect.objectContaining(expected))
),
created: [],
deleted: [],
skipped: [],
});
});
});
describe('overwrite_data_views', () => {
it('should add an index pattern to a rule and overwrite the data view when overwrite_data_views is true', async () => {
const ruleId = 'ruleId';

View file

@ -406,5 +406,164 @@ export default ({ getService }: FtrProviderContext): void => {
});
});
});
describe('@skipInServerless @skipInServerlessMKI schedule bulk gap filling action', () => {
it('should return all existing and enabled rules as succeeded', async () => {
const intervalInMinutes = 25;
const interval = `${intervalInMinutes}m`;
const createdRule1 = await createRule(
supertest,
log,
getCustomQueryRuleParams({
rule_id: 'rule-1',
enabled: true,
interval,
})
);
const createdRule2 = await createRule(
supertest,
log,
getCustomQueryRuleParams({
rule_id: 'rule-2',
enabled: true,
interval,
})
);
const endDate = moment();
const startDate = endDate.clone().subtract(1, 'h');
const { body } = await securitySolutionApi
.performRulesBulkAction({
query: { dry_run: true },
body: {
ids: [createdRule1.id, createdRule2.id],
action: BulkActionTypeEnum.fill_gaps,
[BulkActionTypeEnum.fill_gaps]: {
start_date: startDate.toISOString(),
end_date: endDate.toISOString(),
},
},
})
.expect(200);
expect(body.attributes.summary).toEqual({
failed: 0,
skipped: 0,
succeeded: 2,
total: 2,
});
expect(body.attributes.errors).toBeUndefined();
});
it('should return 500 error if some rules do not exist', async () => {
const intervalInMinutes = 25;
const interval = `${intervalInMinutes}m`;
const createdRule1 = await createRule(
supertest,
log,
getCustomQueryRuleParams({
rule_id: 'rule-1',
enabled: true,
interval,
})
);
const endDate = moment();
const startDate = endDate.clone().subtract(1, 'h');
const { body } = await securitySolutionApi
.performRulesBulkAction({
query: { dry_run: true },
body: {
ids: [createdRule1.id, 'rule-2'],
action: BulkActionTypeEnum.fill_gaps,
[BulkActionTypeEnum.fill_gaps]: {
start_date: startDate.toISOString(),
end_date: endDate.toISOString(),
},
},
})
.expect(500);
expect(body.attributes.summary).toEqual({
failed: 1,
skipped: 0,
succeeded: 1,
total: 2,
});
expect(body.attributes.errors).toHaveLength(1);
expect(body.attributes.errors[0]).toEqual({
message: 'Rule not found',
status_code: 500,
rules: [
{
id: 'rule-2',
},
],
});
});
it('should return 500 error if some rules are disabled', async () => {
const intervalInMinutes = 25;
const interval = `${intervalInMinutes}m`;
const createdRule1 = await createRule(
supertest,
log,
getCustomQueryRuleParams({
rule_id: 'rule-1',
enabled: false,
interval,
})
);
const createdRule2 = await createRule(
supertest,
log,
getCustomQueryRuleParams({
rule_id: 'rule-2',
enabled: true,
interval,
})
);
const endDate = moment();
const startDate = endDate.clone().subtract(1, 'h');
const { body } = await securitySolutionApi
.performRulesBulkAction({
query: { dry_run: true },
body: {
ids: [createdRule1.id, createdRule2.id],
action: BulkActionTypeEnum.fill_gaps,
[BulkActionTypeEnum.fill_gaps]: {
start_date: startDate.toISOString(),
end_date: endDate.toISOString(),
},
},
})
.expect(500);
expect(body.attributes.summary).toEqual({
failed: 1,
skipped: 0,
succeeded: 1,
total: 2,
});
expect(body.attributes.errors).toHaveLength(1);
expect(body.attributes.errors[0]).toEqual({
err_code: 'RULE_FILL_GAPS_DISABLED_RULE',
message: 'Cannot bulk fill gaps for a disabled rule',
status_code: 500,
rules: [
{
id: createdRule1.id,
name: createdRule1.name,
},
],
});
});
});
});
};

View file

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Client } from '@elastic/elasticsearch';
export const deleteAllGaps = async (client: Client) => {
const response = await client.deleteByQuery({
index: '.ds-.kibana-event-log-*',
refresh: true,
query: {
bool: {
must: [{ term: { 'event.action': 'gap' } }, { term: { 'event.provider': 'alerting' } }],
},
},
});
return response.deleted;
};

View file

@ -0,0 +1,161 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import moment from 'moment';
import { faker } from '@faker-js/faker';
import type { Client } from '@elastic/elasticsearch';
import { chunk } from 'lodash';
export type GapEvent = { _id?: string | null | undefined } & ReturnType<
typeof generateNonOverlappingGapEvents
>[0];
const ingestGapEvents = async (client: Client, gapEvents: GapEvent[]) => {
if (!client) throw new Error('Failed to get ES client');
const chunks = chunk(gapEvents, 1000);
for (const aChunk of chunks) {
const operations = aChunk.flatMap((doc) => [
{ create: { _index: '.kibana-event-log-ds' } },
doc,
]);
const response = await client.bulk({ operations, refresh: true });
// enrich the gaps with the created id
aChunk.forEach((gap, idx) => {
gap._id = response.items[idx]?.create?._id;
});
}
};
const generateNonOverlappingGapEvents = (
ruleId: string,
ruleName: string,
fromHours: number,
gapCount: number
) => {
const totalMinutes = fromHours * 60;
// Calculate maximum duration for each gap including spacing
const maxTimePerGap = Math.floor(totalMinutes / gapCount);
// Ensure minimum values are at least 1
const minGapDuration = Math.max(1, Math.min(5, Math.floor(maxTimePerGap * 0.6))); // 60% of available time
const maxGapDuration = Math.max(
minGapDuration + 1,
Math.min(30, Math.floor(maxTimePerGap * 0.8))
); // 80% of available time
const maxSpaceBetweenGaps = Math.max(1, Math.floor(maxTimePerGap * 0.2)); // 20% of available time
const gaps: Array<{ start: number; end: number }> = [];
let currentTimePoint = 0;
// Generate exactly gapCount gaps
for (let i = 0; i < gapCount; i++) {
const gapDuration = faker.number.int({
min: minGapDuration,
max: maxGapDuration,
});
const spaceBetweenGaps = faker.number.int({
min: 1,
max: maxSpaceBetweenGaps,
});
const gapEnd = currentTimePoint + spaceBetweenGaps;
const gapStart = gapEnd + gapDuration;
currentTimePoint = gapStart;
gaps.push({ start: gapEnd, end: gapStart });
}
// Convert minute-based gaps to actual gap events
return gaps.map((gap) => {
const gapDurationMs = (gap.end - gap.start) * 60 * 1000;
const gapEndTime = moment().subtract(gap.start, 'minutes');
const gapStartTime = moment().subtract(gap.end, 'minutes');
const range = {
gte: gapStartTime.toISOString(),
lte: gapEndTime.toISOString(),
};
return {
'@timestamp': range.lte,
event: {
provider: 'alerting',
action: 'gap',
kind: 'alert',
category: ['siem'],
},
kibana: {
alert: {
rule: {
revision: 1,
rule_type_id: 'siem.queryRule',
consumer: 'siem',
execution: {
uuid: faker.string.uuid(),
},
gap: {
range,
filled_intervals: [],
in_progress_intervals: [],
unfilled_intervals: [range],
status: 'unfilled',
total_gap_duration_ms: gapDurationMs,
filled_duration_ms: 0,
unfilled_duration_ms: gapDurationMs,
in_progress_duration_ms: 0,
},
},
},
saved_objects: [
{
rel: 'primary',
type: 'alert',
id: ruleId,
type_id: 'siem.queryRule',
},
],
space_ids: ['default'],
server_uuid: '5d29f261-1b85-4d90-9088-53e0e0e87c7c',
version: '9.1.0',
},
rule: {
id: ruleId,
license: 'basic',
category: 'siem.queryRule',
ruleset: 'siem',
name: ruleName,
},
ecs: {
version: '1.8.0',
},
};
});
};
export const generateGapsForRule = async (
esClient: Client,
rule: { id: string; name: string },
gapsCount: number
) => {
let gapEvents: GapEvent[] = [];
if (gapsCount > 0) {
// Generate non-overlapping gap events for each rule
gapEvents = generateNonOverlappingGapEvents(
rule.id,
rule.name || 'Unknown Rule',
24 * 90,
gapsCount
);
}
await ingestGapEvents(esClient, gapEvents);
return { gapEvents };
};