mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
Co-authored-by: Dmitry Shevchenko <dmitrii.shevchenko@elastic.co>
This commit is contained in:
parent
17b76310c7
commit
e868bc1984
5 changed files with 325 additions and 14 deletions
|
@ -18,6 +18,7 @@ import { findRules } from '../../rules/find_rules';
|
|||
import { buildSiemResponse } from '../utils';
|
||||
import { buildRouteValidation } from '../../../../utils/build_validation/route_validation';
|
||||
import { transformFindAlerts } from './utils';
|
||||
import { getCurrentRuleStatuses } from './utils/get_current_rule_statuses';
|
||||
|
||||
// eslint-disable-next-line no-restricted-imports
|
||||
import { legacyGetBulkRuleActionsSavedObject } from '../../rule_actions/legacy_get_bulk_rule_actions_saved_object';
|
||||
|
@ -66,14 +67,12 @@ export const findRulesRoute = (
|
|||
filter: query.filter,
|
||||
fields: query.fields,
|
||||
});
|
||||
const alertIds = rules.data.map((rule) => rule.id);
|
||||
const ruleIds = rules.data.map((rule) => rule.id);
|
||||
|
||||
const spaceId = context.securitySolution.getSpaceId();
|
||||
const [currentStatusesByRuleId, ruleActions] = await Promise.all([
|
||||
execLogClient.getCurrentStatusBulk({
|
||||
ruleIds: alertIds,
|
||||
spaceId: context.securitySolution.getSpaceId(),
|
||||
}),
|
||||
legacyGetBulkRuleActionsSavedObject({ alertIds, savedObjectsClient, logger }),
|
||||
getCurrentRuleStatuses({ ruleIds, execLogClient, spaceId, logger }),
|
||||
legacyGetBulkRuleActionsSavedObject({ alertIds: ruleIds, savedObjectsClient, logger }),
|
||||
]);
|
||||
const transformed = transformFindAlerts(rules, currentStatusesByRuleId, ruleActions);
|
||||
if (transformed == null) {
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { chunk } from 'lodash';
|
||||
import { Logger } from 'src/core/server';
|
||||
import { initPromisePool } from '../../../../../utils/promise_pool';
|
||||
import { GetCurrentStatusBulkResult, IRuleExecutionLogClient } from '../../../rule_execution_log';
|
||||
|
||||
const RULES_PER_CHUNK = 1000;
|
||||
|
||||
interface GetCurrentRuleStatusesArgs {
|
||||
ruleIds: string[];
|
||||
execLogClient: IRuleExecutionLogClient;
|
||||
spaceId: string;
|
||||
logger: Logger;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the most recent execution status for each of the given rule IDs.
|
||||
* This method splits work into chunks so not to owerwhelm Elasticsearch
|
||||
* when fetching statuses for a big number of rules.
|
||||
*
|
||||
* @param ruleIds Rule IDs to fetch statuses for
|
||||
* @param execLogClient RuleExecutionLogClient
|
||||
* @param spaceId Current Space ID
|
||||
* @param logger Logger
|
||||
* @returns A dict with rule IDs as keys and rule statuses as values
|
||||
*
|
||||
* @throws AggregateError if any of the rule status requests fail
|
||||
*/
|
||||
export async function getCurrentRuleStatuses({
|
||||
ruleIds,
|
||||
execLogClient,
|
||||
spaceId,
|
||||
logger,
|
||||
}: GetCurrentRuleStatusesArgs): Promise<GetCurrentStatusBulkResult> {
|
||||
const { results, errors } = await initPromisePool({
|
||||
concurrency: 1,
|
||||
items: chunk(ruleIds, RULES_PER_CHUNK),
|
||||
executor: (ruleIdsChunk) =>
|
||||
execLogClient
|
||||
.getCurrentStatusBulk({
|
||||
ruleIds: ruleIdsChunk,
|
||||
spaceId,
|
||||
})
|
||||
.catch((error) => {
|
||||
logger.error(
|
||||
`Error fetching rule status: ${error instanceof Error ? error.message : String(error)}`
|
||||
);
|
||||
throw error;
|
||||
}),
|
||||
});
|
||||
|
||||
if (errors.length) {
|
||||
throw new AggregateError(errors, 'Error fetching rule statuses');
|
||||
}
|
||||
|
||||
// Merge all rule statuses into a single dict
|
||||
return Object.assign({}, ...results);
|
||||
}
|
|
@ -5,6 +5,7 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import { chunk } from 'lodash';
|
||||
import { SavedObjectsFindOptionsReference } from 'kibana/server';
|
||||
import { Logger } from 'src/core/server';
|
||||
|
||||
|
@ -17,6 +18,7 @@ import { LegacyIRuleActionsAttributesSavedObjectAttributes } from './legacy_type
|
|||
import { legacyGetRuleActionsFromSavedObject } from './legacy_utils';
|
||||
// eslint-disable-next-line no-restricted-imports
|
||||
import { LegacyRulesActionsSavedObject } from './legacy_get_rule_actions_saved_object';
|
||||
import { initPromisePool } from '../../../utils/promise_pool';
|
||||
|
||||
/**
|
||||
* @deprecated Once we are confident all rules relying on side-car actions SO's have been migrated to SO references we should remove this function
|
||||
|
@ -39,15 +41,29 @@ export const legacyGetBulkRuleActionsSavedObject = async ({
|
|||
id: alertId,
|
||||
type: 'alert',
|
||||
}));
|
||||
const {
|
||||
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||
saved_objects,
|
||||
} = await savedObjectsClient.find<LegacyIRuleActionsAttributesSavedObjectAttributes>({
|
||||
type: legacyRuleActionsSavedObjectType,
|
||||
perPage: 10000,
|
||||
hasReference: references,
|
||||
const { results, errors } = await initPromisePool({
|
||||
concurrency: 1,
|
||||
items: chunk(references, 1000),
|
||||
executor: (referencesChunk) =>
|
||||
savedObjectsClient
|
||||
.find<LegacyIRuleActionsAttributesSavedObjectAttributes>({
|
||||
type: legacyRuleActionsSavedObjectType,
|
||||
perPage: 10000,
|
||||
hasReference: referencesChunk,
|
||||
})
|
||||
.catch((error) => {
|
||||
logger.error(
|
||||
`Error fetching rule actions: ${error instanceof Error ? error.message : String(error)}`
|
||||
);
|
||||
throw error;
|
||||
}),
|
||||
});
|
||||
return saved_objects.reduce(
|
||||
if (errors.length) {
|
||||
throw new AggregateError(errors, 'Error fetching rule actions');
|
||||
}
|
||||
|
||||
const savedObjects = results.flatMap((result) => result.saved_objects);
|
||||
return savedObjects.reduce(
|
||||
(acc: { [key: string]: LegacyRulesActionsSavedObject }, savedObject) => {
|
||||
const ruleAlertId = savedObject.references.find((reference) => {
|
||||
// Find the first rule alert and assume that is the one we want since we should only ever have 1.
|
||||
|
|
|
@ -0,0 +1,174 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { initPromisePool } from './promise_pool';
|
||||
|
||||
const nextTick = () => new Promise((resolve) => setImmediate(resolve));
|
||||
|
||||
const initPoolWithTasks = ({ concurrency = 1, items = [1, 2, 3] }) => {
|
||||
const asyncTasks: Record<
|
||||
number,
|
||||
{
|
||||
status: 'pending' | 'resolved' | 'rejected';
|
||||
resolve: () => void;
|
||||
reject: () => void;
|
||||
}
|
||||
> = {};
|
||||
|
||||
const promisePool = initPromisePool({
|
||||
concurrency,
|
||||
items,
|
||||
executor: async (x) =>
|
||||
new Promise((resolve, reject) => {
|
||||
asyncTasks[x] = {
|
||||
status: 'pending',
|
||||
resolve: () => {
|
||||
asyncTasks[x].status = 'resolved';
|
||||
resolve(x);
|
||||
},
|
||||
reject: () => {
|
||||
asyncTasks[x].status = 'rejected';
|
||||
reject(new Error(`Error processing ${x}`));
|
||||
},
|
||||
};
|
||||
}),
|
||||
});
|
||||
|
||||
return [promisePool, asyncTasks] as const;
|
||||
};
|
||||
|
||||
describe('initPromisePool', () => {
|
||||
it('should execute async tasks', async () => {
|
||||
const { results, errors } = await initPromisePool({
|
||||
concurrency: 1,
|
||||
items: [1, 2, 3],
|
||||
executor: async (x) => x,
|
||||
});
|
||||
|
||||
expect(results).toEqual([1, 2, 3]);
|
||||
expect(errors).toEqual([]);
|
||||
});
|
||||
|
||||
it('should capture any errors that occur during tasks execution', async () => {
|
||||
const { results, errors } = await initPromisePool({
|
||||
concurrency: 1,
|
||||
items: [1, 2, 3],
|
||||
executor: async (x) => {
|
||||
throw new Error(`Error processing ${x}`);
|
||||
},
|
||||
});
|
||||
|
||||
expect(results).toEqual([]);
|
||||
expect(errors).toEqual([
|
||||
new Error(`Error processing 1`),
|
||||
new Error(`Error processing 2`),
|
||||
new Error(`Error processing 3`),
|
||||
]);
|
||||
});
|
||||
|
||||
it('should respect concurrency', async () => {
|
||||
const [promisePool, asyncTasks] = initPoolWithTasks({
|
||||
concurrency: 1,
|
||||
items: [1, 2, 3],
|
||||
});
|
||||
|
||||
// Check that we have only one task pending initially as concurrency = 1
|
||||
expect(asyncTasks).toEqual({
|
||||
1: expect.objectContaining({ status: 'pending' }),
|
||||
});
|
||||
|
||||
asyncTasks[1].resolve();
|
||||
await nextTick();
|
||||
|
||||
// Check that after resolving the first task, the second is pending
|
||||
expect(asyncTasks).toEqual({
|
||||
1: expect.objectContaining({ status: 'resolved' }),
|
||||
2: expect.objectContaining({ status: 'pending' }),
|
||||
});
|
||||
|
||||
asyncTasks[2].reject();
|
||||
await nextTick();
|
||||
|
||||
// Check that after rejecting the second task, the third is pending
|
||||
expect(asyncTasks).toEqual({
|
||||
1: expect.objectContaining({ status: 'resolved' }),
|
||||
2: expect.objectContaining({ status: 'rejected' }),
|
||||
3: expect.objectContaining({ status: 'pending' }),
|
||||
});
|
||||
|
||||
asyncTasks[3].resolve();
|
||||
await nextTick();
|
||||
|
||||
// Check that all taks have been settled
|
||||
expect(asyncTasks).toEqual({
|
||||
1: expect.objectContaining({ status: 'resolved' }),
|
||||
2: expect.objectContaining({ status: 'rejected' }),
|
||||
3: expect.objectContaining({ status: 'resolved' }),
|
||||
});
|
||||
|
||||
const { results, errors } = await promisePool;
|
||||
|
||||
// Check final reesuts
|
||||
expect(results).toEqual([1, 3]);
|
||||
expect(errors).toEqual([new Error(`Error processing 2`)]);
|
||||
});
|
||||
|
||||
it('should be possible to configure concurrency', async () => {
|
||||
const [promisePool, asyncTasks] = initPoolWithTasks({
|
||||
concurrency: 2,
|
||||
items: [1, 2, 3, 4, 5],
|
||||
});
|
||||
|
||||
// Check that we have only two tasks pending initially as concurrency = 2
|
||||
expect(asyncTasks).toEqual({
|
||||
1: expect.objectContaining({ status: 'pending' }),
|
||||
2: expect.objectContaining({ status: 'pending' }),
|
||||
});
|
||||
|
||||
asyncTasks[1].resolve();
|
||||
await nextTick();
|
||||
|
||||
// Check that after resolving the first task, the second and the third is pending
|
||||
expect(asyncTasks).toEqual({
|
||||
1: expect.objectContaining({ status: 'resolved' }),
|
||||
2: expect.objectContaining({ status: 'pending' }),
|
||||
3: expect.objectContaining({ status: 'pending' }),
|
||||
});
|
||||
|
||||
asyncTasks[2].reject();
|
||||
asyncTasks[3].reject();
|
||||
await nextTick();
|
||||
|
||||
// Check that after rejecting the second and the third tasks, the rest are pending
|
||||
expect(asyncTasks).toEqual({
|
||||
1: expect.objectContaining({ status: 'resolved' }),
|
||||
2: expect.objectContaining({ status: 'rejected' }),
|
||||
3: expect.objectContaining({ status: 'rejected' }),
|
||||
4: expect.objectContaining({ status: 'pending' }),
|
||||
5: expect.objectContaining({ status: 'pending' }),
|
||||
});
|
||||
|
||||
asyncTasks[4].resolve();
|
||||
asyncTasks[5].resolve();
|
||||
await nextTick();
|
||||
|
||||
// Check that all taks have been settled
|
||||
expect(asyncTasks).toEqual({
|
||||
1: expect.objectContaining({ status: 'resolved' }),
|
||||
2: expect.objectContaining({ status: 'rejected' }),
|
||||
3: expect.objectContaining({ status: 'rejected' }),
|
||||
4: expect.objectContaining({ status: 'resolved' }),
|
||||
5: expect.objectContaining({ status: 'resolved' }),
|
||||
});
|
||||
|
||||
const { results, errors } = await promisePool;
|
||||
|
||||
// Check final reesuts
|
||||
expect(results).toEqual([1, 4, 5]);
|
||||
expect(errors).toEqual([new Error(`Error processing 2`), new Error(`Error processing 3`)]);
|
||||
});
|
||||
});
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
interface PromisePoolArgs<Item, Result> {
|
||||
concurrency?: number;
|
||||
items: Item[];
|
||||
executor: (item: Item) => Promise<Result>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs promises in batches. It ensures that the number of running async tasks
|
||||
* doesn't exceed the concurrency parameter passed to the function.
|
||||
*
|
||||
* @param concurrency - number of tasks run in parallel
|
||||
* @param items - array of items to be passes to async executor
|
||||
* @param executor - an async function to be called with each provided item
|
||||
*
|
||||
* @returns Struct holding results or errors of async tasks
|
||||
*/
|
||||
export const initPromisePool = async <Item, Result>({
|
||||
concurrency = 1,
|
||||
items,
|
||||
executor,
|
||||
}: PromisePoolArgs<Item, Result>) => {
|
||||
const tasks: Array<Promise<void>> = [];
|
||||
const results: Result[] = [];
|
||||
const errors: unknown[] = [];
|
||||
|
||||
for (const item of items) {
|
||||
// Check if the pool is full
|
||||
if (tasks.length >= concurrency) {
|
||||
// Wait for any first task to finish
|
||||
await Promise.race(tasks);
|
||||
}
|
||||
|
||||
const task: Promise<void> = executor(item)
|
||||
.then((result) => {
|
||||
results.push(result);
|
||||
})
|
||||
.catch(async (error) => {
|
||||
errors.push(error);
|
||||
})
|
||||
.finally(() => {
|
||||
tasks.splice(tasks.indexOf(task), 1);
|
||||
});
|
||||
|
||||
tasks.push(task);
|
||||
}
|
||||
|
||||
// Wait for all remaining tasks to finish
|
||||
await Promise.all(tasks);
|
||||
|
||||
return { results, errors };
|
||||
};
|
Loading…
Add table
Add a link
Reference in a new issue