[Security Solution][Entity Analytics] Risk Scoring Task tuning (#167460)

## Summary

* Allows risk score task to be cancelled (either by timeout or manually)
* Increases task timeout to 10m (from 5m)
* Reduces the number of persisted risk inputs from 10 to 5
* Excludes `closed` alerts from risk scoring calculation


### Checklist
- [x] Mocking time is hard; validate via smoke testing

- [ ]
[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)
was added for features that require explanation or tutorials
- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
This commit is contained in:
Ryland Herrick 2023-10-02 12:27:46 -05:00 committed by GitHub
parent 40303a88c5
commit ea33696ba7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 98 additions and 33 deletions

View file

@ -14,6 +14,7 @@ import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import {
ALERT_RISK_SCORE,
ALERT_RULE_NAME,
ALERT_WORKFLOW_STATUS,
EVENT_KIND,
} from '@kbn/rule-registry-plugin/common/technical_rule_data_field_names';
import type {
@ -158,7 +159,7 @@ const buildIdentifierTypeAggregation = ({
aggs: {
inputs: {
top_hits: {
size: 10,
size: 5,
sort: { [ALERT_RISK_SCORE]: 'desc' },
_source: false,
docvalue_fields: ['@timestamp', ALERT_RISK_SCORE, ALERT_RULE_NAME],
@ -213,7 +214,11 @@ export const calculateRiskScores = async ({
withSecuritySpan('calculateRiskScores', async () => {
const now = new Date().toISOString();
const filter = [{ exists: { field: ALERT_RISK_SCORE } }, filterFromRange(range)];
const filter = [
filterFromRange(range),
{ bool: { must_not: { term: { [ALERT_WORKFLOW_STATUS]: 'closed' } } } },
{ exists: { field: ALERT_RISK_SCORE } },
];
if (!isEmpty(userFilter)) {
filter.push(userFilter as QueryDslQueryContainer);
}

View file

@ -9,7 +9,7 @@ export const SCOPE = ['securitySolution'];
export const TYPE = 'risk_engine:risk_scoring';
export const VERSION = '0.0.1';
export const INTERVAL = '1h';
export const TIMEOUT = '5m';
export const TIMEOUT = '10m';
export const RISK_SCORING_TASK_CONSTANTS = {
SCOPE,

View file

@ -173,6 +173,7 @@ describe('Risk Scoring Task', () => {
describe('runTask()', () => {
let riskScoringTaskInstanceMock: ReturnType<typeof riskScoringTaskMock.createInstance>;
let getRiskScoreService: (namespace: string) => Promise<RiskScoreService>;
let mockIsCancelled: jest.MockedFunction<() => boolean>;
beforeEach(async () => {
await startRiskScoringTask({
@ -195,6 +196,7 @@ describe('Risk Scoring Task', () => {
pageSize: 10_000,
range: { start: 'now-30d', end: 'now' },
});
mockIsCancelled = jest.fn().mockReturnValue(false);
getRiskScoreService = jest.fn().mockResolvedValueOnce(mockRiskScoreService);
});
@ -213,6 +215,7 @@ describe('Risk Scoring Task', () => {
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
isCancelled: mockIsCancelled,
telemetry: mockTelemetry,
});
expect(mockRiskScoreService.calculateAndPersistScores).toHaveBeenCalledTimes(1);
@ -239,6 +242,7 @@ describe('Risk Scoring Task', () => {
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
isCancelled: mockIsCancelled,
telemetry: mockTelemetry,
});
@ -250,6 +254,7 @@ describe('Risk Scoring Task', () => {
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
isCancelled: mockIsCancelled,
telemetry: mockTelemetry,
});
expect(mockRiskScoreService.calculateAndPersistScores).toHaveBeenCalledTimes(2);
@ -271,6 +276,7 @@ describe('Risk Scoring Task', () => {
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
isCancelled: mockIsCancelled,
telemetry: mockTelemetry,
});
@ -319,6 +325,7 @@ describe('Risk Scoring Task', () => {
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
isCancelled: mockIsCancelled,
telemetry: mockTelemetry,
});
expect(mockRiskScoreService.calculateAndPersistScores).toHaveBeenCalledTimes(4);
@ -342,6 +349,7 @@ describe('Risk Scoring Task', () => {
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
isCancelled: mockIsCancelled,
telemetry: mockTelemetry,
});
@ -371,6 +379,7 @@ describe('Risk Scoring Task', () => {
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
isCancelled: mockIsCancelled,
telemetry: mockTelemetry,
});
@ -384,6 +393,7 @@ describe('Risk Scoring Task', () => {
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
isCancelled: mockIsCancelled,
telemetry: mockTelemetry,
});
@ -398,6 +408,7 @@ describe('Risk Scoring Task', () => {
getRiskScoreService: jest.fn().mockResolvedValueOnce(undefined),
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
isCancelled: mockIsCancelled,
telemetry: mockTelemetry,
});
@ -408,39 +419,78 @@ describe('Risk Scoring Task', () => {
});
});
it('send success telemetry event', async () => {
await runTask({
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
describe('when the task timeout has been exceeded', () => {
beforeEach(() => {
mockIsCancelled.mockReturnValue(true);
});
expect(mockTelemetry.reportEvent).toHaveBeenCalledTimes(1);
expect(mockTelemetry.reportEvent).toHaveBeenCalledWith('risk_score_execution_success', {
executionDurationExceededInterval: false,
scoresWritten: 10,
taskDurationInSeconds: 0,
});
});
it('send error telemetry event', async () => {
mockRiskScoreService.calculateAndPersistScores.mockReset();
mockRiskScoreService.calculateAndPersistScores.mockImplementationOnce(() => {
throw new Error();
});
try {
it('stops task execution', async () => {
await runTask({
getRiskScoreService,
isCancelled: mockIsCancelled,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
});
} catch (err) {
expect(mockRiskScoreService.calculateAndPersistScores).not.toHaveBeenCalled();
});
it('logs that the task was cancelled', async () => {
await runTask({
getRiskScoreService,
isCancelled: mockIsCancelled,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
});
expect(mockLogger.info).toHaveBeenCalledWith(
expect.stringContaining('task was cancelled')
);
});
});
describe('telemetry', () => {
it('send success telemetry event', async () => {
await runTask({
getRiskScoreService,
isCancelled: mockIsCancelled,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
});
expect(mockTelemetry.reportEvent).toHaveBeenCalledTimes(1);
expect(mockTelemetry.reportEvent).toHaveBeenCalledWith('risk_score_execution_error', {});
}
expect(mockTelemetry.reportEvent).toHaveBeenCalledWith('risk_score_execution_success', {
executionDurationExceededInterval: false,
scoresWritten: 10,
taskDurationInSeconds: 0,
});
});
it('send error telemetry event', async () => {
mockRiskScoreService.calculateAndPersistScores.mockReset();
mockRiskScoreService.calculateAndPersistScores.mockImplementationOnce(() => {
throw new Error();
});
try {
await runTask({
getRiskScoreService,
isCancelled: mockIsCancelled,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
});
} catch (err) {
expect(mockTelemetry.reportEvent).toHaveBeenCalledTimes(1);
expect(mockTelemetry.reportEvent).toHaveBeenCalledWith(
'risk_score_execution_error',
{}
);
}
});
});
});
});

View file

@ -161,11 +161,13 @@ export const removeRiskScoringTask = async ({
export const runTask = async ({
getRiskScoreService,
isCancelled,
logger,
taskInstance,
telemetry,
}: {
logger: Logger;
isCancelled: () => boolean;
getRiskScoreService: GetRiskScoreService;
taskInstance: ConcreteTaskInstance;
telemetry: AnalyticsServiceSetup;
@ -228,7 +230,7 @@ export const runTask = async ({
: [RiskScoreEntity.host, RiskScoreEntity.user];
await asyncForEach(identifierTypes, async (identifierType) => {
let isWorkComplete = false;
let isWorkComplete = isCancelled();
let afterKeys: AfterKeys = {};
while (!isWorkComplete) {
const result = await riskScoreService.calculateAndPersistScores({
@ -242,7 +244,7 @@ export const runTask = async ({
weights: [],
});
isWorkComplete = isRiskScoreCalculationComplete(result);
isWorkComplete = isRiskScoreCalculationComplete(result) || isCancelled();
afterKeys = result.after_keys;
scoresWritten += result.scores_written;
}
@ -265,6 +267,9 @@ export const runTask = async ({
telemetry.reportEvent(RISK_SCORE_EXECUTION_SUCCESS_EVENT.eventType, telemetryEvent);
if (isCancelled()) {
log('task was cancelled');
}
log('task run completed');
log(JSON.stringify(telemetryEvent));
return {
@ -287,8 +292,13 @@ const createTaskRunnerFactory =
telemetry: AnalyticsServiceSetup;
}) =>
({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => {
let cancelled = false;
const isCancelled = () => cancelled;
return {
run: async () => runTask({ getRiskScoreService, logger, taskInstance, telemetry }),
cancel: async () => {},
run: async () =>
runTask({ getRiskScoreService, isCancelled, logger, taskInstance, telemetry }),
cancel: async () => {
cancelled = true;
},
};
};

View file

@ -24,7 +24,7 @@ const getDefaultRiskEngineConfiguration = ({
filter: {},
identifierType: undefined,
interval: '1h',
pageSize: 10_000,
pageSize: 3_500,
range: { start: 'now-30d', end: 'now' },
});

View file

@ -310,7 +310,7 @@ export default ({ getService }: FtrProviderContext) => {
enabled: true,
filter: {},
interval: '1h',
pageSize: 10000,
pageSize: 3500,
range: {
end: 'now',
start: 'now-30d',