Risk score engine telemetry (#166787)

## Summary

Implement risk score engine telemetry

Here we use 2 types of telemetry:

Event base telemetry:
- Risk execution success. With parameters `scoresWritten`,
`taskCompletionTimeSeconds`, `isRunMoreThanInteval`
- Risk execution error

Usage telemetry:

- `unique_user_risk_score_total` and `unique_host_risk_score_total` -
Total amount from latest transform index for host and users
- `unique_user_risk_score_day` and `unique_host_risk_score_day` - Last
day amount from the latest transform index for host and users
- `all_host_risk_scores_total` and `all_user_risk_scores_total` - Total
amount from datastream for all risk executions for host and users
- `all_host_risk_scores_total_day` and `all_user_risk_scores_total_day`
- Last day amount from datastream for all risk executions for host and
users
- `all_risk_scores_index_size` and `unique_risk_scores_index_size` -
sizes of datastream of all risk scores and latest transform index

---------

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Khristinin Nikita 2023-09-29 06:46:05 +02:00 committed by GitHub
parent 7aee2e2d55
commit 98a81d1b5e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 1026 additions and 95 deletions

View file

@ -7,5 +7,9 @@
export const riskScoreBaseIndexName = 'risk-score';
export const allRiskScoreIndexPattern = '.ds-risk-score*';
export const latestRiskScoreIndexPattern = 'risk-score.risk-score-latest-*';
export const getRiskScoreLatestIndex = (spaceId = 'default') =>
`${riskScoreBaseIndexName}.risk-score-latest-${spaceId}`;

View file

@ -6,7 +6,7 @@
*/
import moment from 'moment';
import { convertDateToISOString } from './helpers';
import { convertDateToISOString, isExecutionDurationExceededInterval } from './helpers';
moment.suppressDeprecationWarnings = true;
@ -49,3 +49,17 @@ describe('convertDateToISOString', () => {
}).toThrowErrorMatchingInlineSnapshot(`"Could not convert string \\"hi mom\\" to ISO string"`);
});
});
describe('isExecutionDurationExceededInterval', () => {
it('return false if the execution duration interval not defiend', () => {
expect(isExecutionDurationExceededInterval(undefined, 1000)).toEqual(false);
});
it('return false if the execution duration is less than the interval', () => {
expect(isExecutionDurationExceededInterval('1m', 59)).toEqual(false);
});
it('return true if the execution duration is greater than the interval', () => {
expect(isExecutionDurationExceededInterval('1m', 61)).toEqual(true);
});
});

View file

@ -13,6 +13,7 @@ import {
type CoreStart,
} from '@kbn/core/server';
import { addSpaceIdToPath } from '@kbn/spaces-plugin/server';
import { parseIntervalAsSecond } from '@kbn/task-manager-plugin/server/lib/intervals';
import type { Range } from '../../../../common/risk_engine';
@ -72,3 +73,15 @@ export const buildScopedInternalSavedObjectsClientUnsafe = ({
excludedExtensions: [SECURITY_EXTENSION_ID],
});
};
export const isExecutionDurationExceededInterval = (
interval: string | undefined,
taskDurationInSeconds: number
): boolean => {
let executionDurationExceededInterval = false;
if (interval) {
executionDurationExceededInterval = taskDurationInSeconds > parseIntervalAsSecond(interval);
}
return executionDurationExceededInterval;
};

View file

@ -9,6 +9,7 @@ import { SavedObjectsErrorHelpers } from '@kbn/core/server';
import { coreMock } from '@kbn/core/server/mocks';
import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks';
import { loggerMock } from '@kbn/logging-mocks';
import type { AnalyticsServiceSetup } from '@kbn/core/public';
import type { RiskScoreService } from '../risk_score_service';
import { riskScoreServiceMock } from '../risk_score_service.mock';
@ -30,6 +31,7 @@ describe('Risk Scoring Task', () => {
let mockTaskManagerSetup: ReturnType<typeof taskManagerMock.createSetup>;
let mockTaskManagerStart: ReturnType<typeof taskManagerMock.createStart>;
let mockLogger: ReturnType<typeof loggerMock.create>;
let mockTelemetry: jest.Mocked<AnalyticsServiceSetup>;
beforeEach(() => {
mockCore = coreMock.createSetup();
@ -38,6 +40,7 @@ describe('Risk Scoring Task', () => {
mockTaskManagerSetup = taskManagerMock.createSetup();
mockTaskManagerStart = taskManagerMock.createStart();
mockLogger = loggerMock.create();
mockTelemetry = mockCore.analytics;
});
describe('registerRiskScoringTask()', () => {
@ -48,6 +51,7 @@ describe('Risk Scoring Task', () => {
kibanaVersion: '8.10.0',
taskManager: mockTaskManagerSetup,
logger: mockLogger,
telemetry: mockTelemetry,
});
expect(mockTaskManagerSetup.registerTaskDefinitions).toHaveBeenCalled();
});
@ -59,6 +63,7 @@ describe('Risk Scoring Task', () => {
kibanaVersion: '8.10.0',
taskManager: undefined,
logger: mockLogger,
telemetry: mockTelemetry,
});
expect(mockTaskManagerSetup.registerTaskDefinitions).not.toHaveBeenCalled();
});
@ -208,6 +213,7 @@ describe('Risk Scoring Task', () => {
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
});
expect(mockRiskScoreService.calculateAndPersistScores).toHaveBeenCalledTimes(1);
});
@ -233,6 +239,7 @@ describe('Risk Scoring Task', () => {
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
});
expect(mockRiskScoreService.getConfiguration).toHaveBeenCalledTimes(1);
@ -243,6 +250,7 @@ describe('Risk Scoring Task', () => {
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
});
expect(mockRiskScoreService.calculateAndPersistScores).toHaveBeenCalledTimes(2);
});
@ -263,6 +271,7 @@ describe('Risk Scoring Task', () => {
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
});
expect(mockRiskScoreService.calculateAndPersistScores).toHaveBeenCalledWith(
@ -310,6 +319,7 @@ describe('Risk Scoring Task', () => {
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
});
expect(mockRiskScoreService.calculateAndPersistScores).toHaveBeenCalledTimes(4);
@ -332,6 +342,7 @@ describe('Risk Scoring Task', () => {
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
});
expect(initialState).not.toEqual(nextState);
@ -360,6 +371,7 @@ describe('Risk Scoring Task', () => {
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
});
expect(mockRiskScoreService.calculateAndPersistScores).not.toHaveBeenCalled();
@ -372,6 +384,7 @@ describe('Risk Scoring Task', () => {
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
});
expect(mockRiskScoreService.calculateAndPersistScores).not.toHaveBeenCalled();
@ -385,6 +398,7 @@ describe('Risk Scoring Task', () => {
getRiskScoreService: jest.fn().mockResolvedValueOnce(undefined),
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
});
expect(mockRiskScoreService.calculateAndPersistScores).not.toHaveBeenCalled();
@ -393,6 +407,41 @@ describe('Risk Scoring Task', () => {
);
});
});
it('send success telemetry event', async () => {
await runTask({
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
});
expect(mockTelemetry.reportEvent).toHaveBeenCalledTimes(1);
expect(mockTelemetry.reportEvent).toHaveBeenCalledWith('risk_score_execution_success', {
executionDurationExceededInterval: false,
scoresWritten: 10,
taskDurationInSeconds: 0,
});
});
it('send error telemetry event', async () => {
mockRiskScoreService.calculateAndPersistScores.mockReset();
mockRiskScoreService.calculateAndPersistScores.mockImplementationOnce(() => {
throw new Error();
});
try {
await runTask({
getRiskScoreService,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
});
} catch (err) {
expect(mockTelemetry.reportEvent).toHaveBeenCalledTimes(1);
expect(mockTelemetry.reportEvent).toHaveBeenCalledWith('risk_score_execution_error', {});
}
});
});
});
});

View file

@ -18,7 +18,7 @@ import type {
TaskManagerStartContract,
} from '@kbn/task-manager-plugin/server';
import { getDataStreamAdapter } from '@kbn/alerting-plugin/server';
import type { AnalyticsServiceSetup } from '@kbn/core-analytics-server';
import type { AfterKeys, IdentifierType } from '../../../../common/risk_engine';
import type { StartPlugins } from '../../../plugin';
import { type RiskScoreService, riskScoreServiceFactory } from '../risk_score_service';
@ -30,8 +30,16 @@ import {
type LatestTaskStateSchema as RiskScoringTaskState,
} from './state';
import { INTERVAL, SCOPE, TIMEOUT, TYPE, VERSION } from './constants';
import { buildScopedInternalSavedObjectsClientUnsafe, convertRangeToISO } from './helpers';
import {
buildScopedInternalSavedObjectsClientUnsafe,
convertRangeToISO,
isExecutionDurationExceededInterval,
} from './helpers';
import { RiskScoreEntity } from '../../../../common/risk_engine/types';
import {
RISK_SCORE_EXECUTION_SUCCESS_EVENT,
RISK_SCORE_EXECUTION_ERROR_EVENT,
} from '../../telemetry/event_based/events';
const logFactory =
(logger: Logger, taskId: string) =>
@ -49,11 +57,13 @@ export const registerRiskScoringTask = ({
kibanaVersion,
logger,
taskManager,
telemetry,
}: {
getStartServices: StartServicesAccessor<StartPlugins>;
kibanaVersion: string;
logger: Logger;
taskManager: TaskManagerSetupContract | undefined;
telemetry: AnalyticsServiceSetup;
}): void => {
if (!taskManager) {
logger.info('Task Manager is unavailable; skipping risk engine task registration.');
@ -91,7 +101,7 @@ export const registerRiskScoringTask = ({
title: 'Entity Analytics Risk Engine - Risk Scoring Task',
timeout: TIMEOUT,
stateSchemaByVersion,
createTaskRunner: createTaskRunnerFactory({ logger, getRiskScoreService }),
createTaskRunner: createTaskRunnerFactory({ logger, getRiskScoreService, telemetry }),
},
});
};
@ -153,101 +163,132 @@ export const runTask = async ({
getRiskScoreService,
logger,
taskInstance,
telemetry,
}: {
logger: Logger;
getRiskScoreService: GetRiskScoreService;
taskInstance: ConcreteTaskInstance;
telemetry: AnalyticsServiceSetup;
}): Promise<{
state: RiskScoringTaskState;
}> => {
const state = taskInstance.state as RiskScoringTaskState;
const taskId = taskInstance.id;
const log = logFactory(logger, taskId);
const taskExecutionTime = moment().utc().toISOString();
log('running task');
try {
const taskStartTime = moment().utc().toISOString();
log('running task');
let scoresWritten = 0;
const updatedState = {
lastExecutionTimestamp: taskExecutionTime,
namespace: state.namespace,
runs: state.runs + 1,
scoresWritten,
};
let scoresWritten = 0;
const updatedState = {
lastExecutionTimestamp: taskStartTime,
namespace: state.namespace,
runs: state.runs + 1,
scoresWritten,
};
if (taskId !== getTaskId(state.namespace)) {
log('outdated task; exiting');
return { state: updatedState };
}
const riskScoreService = await getRiskScoreService(state.namespace);
if (!riskScoreService) {
log('risk score service is not available; exiting task');
return { state: updatedState };
}
const configuration = await riskScoreService.getConfiguration();
if (configuration == null) {
log(
'Risk engine configuration not found; exiting task. Please reinitialize the risk engine and try again'
);
return { state: updatedState };
}
const {
dataViewId,
enabled,
filter,
identifierType: configuredIdentifierType,
range: configuredRange,
pageSize,
} = configuration;
if (!enabled) {
log('risk engine is not enabled, exiting task');
return { state: updatedState };
}
const range = convertRangeToISO(configuredRange);
const { index, runtimeMappings } = await riskScoreService.getRiskInputsIndex({
dataViewId,
});
const identifierTypes: IdentifierType[] = configuredIdentifierType
? [configuredIdentifierType]
: [RiskScoreEntity.host, RiskScoreEntity.user];
await asyncForEach(identifierTypes, async (identifierType) => {
let isWorkComplete = false;
let afterKeys: AfterKeys = {};
while (!isWorkComplete) {
const result = await riskScoreService.calculateAndPersistScores({
afterKeys,
index,
filter,
identifierType,
pageSize,
range,
runtimeMappings,
weights: [],
});
isWorkComplete = isRiskScoreCalculationComplete(result);
afterKeys = result.after_keys;
scoresWritten += result.scores_written;
if (taskId !== getTaskId(state.namespace)) {
log('outdated task; exiting');
return { state: updatedState };
}
});
updatedState.scoresWritten = scoresWritten;
const riskScoreService = await getRiskScoreService(state.namespace);
if (!riskScoreService) {
log('risk score service is not available; exiting task');
return { state: updatedState };
}
log('task run completed');
return {
state: updatedState,
};
const configuration = await riskScoreService.getConfiguration();
if (configuration == null) {
log(
'Risk engine configuration not found; exiting task. Please reinitialize the risk engine and try again'
);
return { state: updatedState };
}
const {
dataViewId,
enabled,
filter,
identifierType: configuredIdentifierType,
range: configuredRange,
pageSize,
} = configuration;
if (!enabled) {
log('risk engine is not enabled, exiting task');
return { state: updatedState };
}
const range = convertRangeToISO(configuredRange);
const { index, runtimeMappings } = await riskScoreService.getRiskInputsIndex({
dataViewId,
});
const identifierTypes: IdentifierType[] = configuredIdentifierType
? [configuredIdentifierType]
: [RiskScoreEntity.host, RiskScoreEntity.user];
await asyncForEach(identifierTypes, async (identifierType) => {
let isWorkComplete = false;
let afterKeys: AfterKeys = {};
while (!isWorkComplete) {
const result = await riskScoreService.calculateAndPersistScores({
afterKeys,
index,
filter,
identifierType,
pageSize,
range,
runtimeMappings,
weights: [],
});
isWorkComplete = isRiskScoreCalculationComplete(result);
afterKeys = result.after_keys;
scoresWritten += result.scores_written;
}
});
updatedState.scoresWritten = scoresWritten;
const taskCompletionTime = moment().utc().toISOString();
const taskDurationInSeconds = moment(taskCompletionTime).diff(moment(taskStartTime), 'seconds');
const telemetryEvent = {
scoresWritten,
taskDurationInSeconds,
executionDurationExceededInterval: isExecutionDurationExceededInterval(
taskInstance?.schedule?.interval,
taskDurationInSeconds
),
};
telemetry.reportEvent(RISK_SCORE_EXECUTION_SUCCESS_EVENT.eventType, telemetryEvent);
log('task run completed');
log(JSON.stringify(telemetryEvent));
return {
state: updatedState,
};
} catch (e) {
telemetry.reportEvent(RISK_SCORE_EXECUTION_ERROR_EVENT.eventType, {});
throw e;
}
};
const createTaskRunnerFactory =
({ logger, getRiskScoreService }: { logger: Logger; getRiskScoreService: GetRiskScoreService }) =>
({
logger,
getRiskScoreService,
telemetry,
}: {
logger: Logger;
getRiskScoreService: GetRiskScoreService;
telemetry: AnalyticsServiceSetup;
}) =>
({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => {
return {
run: async () => runTask({ getRiskScoreService, logger, taskInstance }),
run: async () => runTask({ getRiskScoreService, logger, taskInstance, telemetry }),
cancel: async () => {},
};
};

View file

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { EventTypeOpts } from '@kbn/analytics-client';
export const RISK_SCORE_EXECUTION_SUCCESS_EVENT: EventTypeOpts<{
scoresWritten: number;
taskDurationInSeconds: number;
executionDurationExceededInterval: boolean;
}> = {
eventType: 'risk_score_execution_success',
schema: {
scoresWritten: {
type: 'long',
_meta: {
description: 'Number of risk scores written during this scoring task execution',
},
},
taskDurationInSeconds: {
type: 'long',
_meta: {
description: 'Duration (in seconds) of the current risk scoring task execution',
},
},
executionDurationExceededInterval: {
type: 'boolean',
_meta: {
description: `Whether the risk scoring task's duration exceeded its allocated interval`,
},
},
},
};
export const RISK_SCORE_EXECUTION_ERROR_EVENT: EventTypeOpts<{}> = {
eventType: 'risk_score_execution_error',
schema: {},
};
export const events = [RISK_SCORE_EXECUTION_SUCCESS_EVENT, RISK_SCORE_EXECUTION_ERROR_EVENT];

View file

@ -96,6 +96,7 @@ import { featureUsageService } from './endpoint/services/feature_usage';
import { actionCreateService } from './endpoint/services/actions';
import { setIsElasticCloudDeployment } from './lib/telemetry/helpers';
import { artifactService } from './lib/telemetry/artifact';
import { events } from './lib/telemetry/event_based/events';
import { endpointFieldsProvider } from './search_strategy/endpoint_fields';
import {
ENDPOINT_FIELDS_SEARCH_STRATEGY,
@ -106,6 +107,7 @@ import {
import { AppFeaturesService } from './lib/app_features_service/app_features_service';
import { registerRiskScoringTask } from './lib/risk_engine/tasks/risk_scoring_task';
import { registerProtectionUpdatesNoteRoutes } from './endpoint/routes/protection_updates_note';
import { latestRiskScoreIndexPattern, allRiskScoreIndexPattern } from '../common/risk_engine';
export type { SetupPlugins, StartPlugins, PluginSetup, PluginStart } from './plugin_contract';
@ -168,6 +170,8 @@ export class Plugin implements ISecuritySolutionPlugin {
initUiSettings(core.uiSettings, experimentalFeatures);
appFeaturesService.init(plugins.features);
events.forEach((eventConfig) => core.analytics.registerEventType(eventConfig));
this.ruleMonitoringService.setup(core, plugins);
if (experimentalFeatures.riskScoringPersistence) {
@ -176,6 +180,7 @@ export class Plugin implements ISecuritySolutionPlugin {
kibanaVersion: pluginContext.env.packageInfo.version,
logger: this.logger,
taskManager: plugins.taskManager,
telemetry: core.analytics,
});
}
@ -208,6 +213,10 @@ export class Plugin implements ISecuritySolutionPlugin {
ml: plugins.ml,
usageCollection: plugins.usageCollection,
logger,
riskEngineIndexPatterns: {
all: allRiskScoreIndexPattern,
latest: latestRiskScoreIndexPattern,
},
});
this.telemetryUsageCounter = plugins.usageCollection?.createUsageCounter(APP_ID);

View file

@ -12,6 +12,8 @@ import { getDetectionsMetrics } from './detections/get_metrics';
import { getInternalSavedObjectsClient } from './get_internal_saved_objects_client';
import { getEndpointMetrics } from './endpoint/get_metrics';
import { getDashboardMetrics } from './dashboards/get_dashboards_metrics';
import { riskEngineMetricsSchema } from './risk_engine/schema';
import { getRiskEngineMetrics } from './risk_engine/get_risk_engine_metrics';
export type RegisterCollector = (deps: CollectorDependencies) => void;
@ -19,6 +21,7 @@ export interface UsageData {
detectionMetrics: {};
endpointMetrics: {};
dashboardMetrics: DashboardMetrics;
riskEngineMetrics: {};
}
export const registerCollector: RegisterCollector = ({
@ -28,6 +31,7 @@ export const registerCollector: RegisterCollector = ({
ml,
usageCollection,
logger,
riskEngineIndexPatterns,
}) => {
if (!usageCollection) {
logger.debug('Usage collection is undefined, therefore returning early without registering it');
@ -2441,29 +2445,33 @@ export const registerCollector: RegisterCollector = ({
},
},
},
riskEngineMetrics: riskEngineMetricsSchema,
},
isReady: () => true,
fetch: async ({ esClient }: CollectorFetchContext): Promise<UsageData> => {
const savedObjectsClient = await getInternalSavedObjectsClient(core);
const [detectionMetrics, endpointMetrics, dashboardMetrics] = await Promise.allSettled([
getDetectionsMetrics({
eventLogIndex,
signalsIndex,
esClient,
savedObjectsClient,
logger,
mlClient: ml,
}),
getEndpointMetrics({ esClient, logger }),
getDashboardMetrics({
savedObjectsClient,
logger,
}),
]);
const [detectionMetrics, endpointMetrics, dashboardMetrics, riskEngineMetrics] =
await Promise.allSettled([
getDetectionsMetrics({
eventLogIndex,
signalsIndex,
esClient,
savedObjectsClient,
logger,
mlClient: ml,
}),
getEndpointMetrics({ esClient, logger }),
getDashboardMetrics({
savedObjectsClient,
logger,
}),
getRiskEngineMetrics({ esClient, logger, riskEngineIndexPatterns }),
]);
return {
detectionMetrics: detectionMetrics.status === 'fulfilled' ? detectionMetrics.value : {},
endpointMetrics: endpointMetrics.status === 'fulfilled' ? endpointMetrics.value : {},
dashboardMetrics: dashboardMetrics.status === 'fulfilled' ? dashboardMetrics.value : {},
riskEngineMetrics: riskEngineMetrics.status === 'fulfilled' ? riskEngineMetrics.value : {},
};
},
});

View file

@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { SearchResponse, IndicesStatsResponse } from '@elastic/elasticsearch/lib/api/types';
export const getAggregationResultMock = ({
user,
host,
}: {
user: number;
host: number;
}): SearchResponse<
never,
{
user_name: {
value: number;
};
host_name: {
value: number;
};
}
> => ({
took: 171,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 0, failed: 0 },
hits: { total: { value: 10000, relation: 'gte' }, max_score: null, hits: [] },
aggregations: { user_name: { value: user }, host_name: { value: host } },
});
export const getStatsResultMock = ({ size }: { size: number }): IndicesStatsResponse => ({
_shards: { total: 2, successful: 1, failed: 0 },
_all: {
primaries: {
docs: { count: 200000, deleted: 0 },
shard_stats: { total_count: 1 },
store: {
size_in_bytes: size,
total_data_set_size_in_bytes: size,
reserved_in_bytes: 0,
},
},
},
});

View file

@ -0,0 +1,148 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
import { getRiskEngineMetrics } from './get_risk_engine_metrics';
import { getAggregationResultMock, getStatsResultMock } from './get_risk_engine_metrics.mocks';
const riskEngineIndexPatterns = {
all: 'an-index-pattern',
latest: 'another-index-pattern',
};
describe('risk engine metrics', () => {
let esClient: ReturnType<typeof elasticsearchServiceMock.createElasticsearchClient>;
let logger: ReturnType<typeof loggingSystemMock.createLogger>;
describe('risk engine not installed', () => {
beforeEach(() => {
esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
logger = loggingSystemMock.createLogger();
});
it('should return empty object', async () => {
esClient.indices.get.mockResponseOnce({});
const result = await getRiskEngineMetrics({
esClient,
logger,
riskEngineIndexPatterns,
});
expect(result).toEqual({});
});
});
describe('risk engine installed', () => {
beforeEach(() => {
esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
logger = loggingSystemMock.createLogger();
});
it('should return metrics object', async () => {
esClient.search.mockResponseOnce(
getAggregationResultMock({
user: 100,
host: 200,
})
);
esClient.search.mockResponseOnce(
getAggregationResultMock({
user: 10,
host: 20,
})
);
esClient.search.mockResponseOnce(
getAggregationResultMock({
user: 1000,
host: 2000,
})
);
esClient.search.mockResponseOnce(
getAggregationResultMock({
user: 500,
host: 600,
})
);
esClient.indices.stats.mockResponseOnce(
getStatsResultMock({
size: 10000,
})
);
esClient.indices.stats.mockResponseOnce(
getStatsResultMock({
size: 5000,
})
);
const result = await getRiskEngineMetrics({
esClient,
logger,
riskEngineIndexPatterns,
});
expect(result).toEqual({
unique_user_risk_score_total: 100,
unique_host_risk_score_total: 200,
unique_user_risk_score_day: 10,
unique_host_risk_score_day: 20,
all_user_risk_scores_total: 1000,
all_host_risk_scores_total: 2000,
all_user_risk_scores_total_day: 500,
all_host_risk_scores_total_day: 600,
all_risk_scores_index_size: 0.01,
unique_risk_scores_index_size: 0.005,
});
});
it('should still return metrics object if some request return error', async () => {
esClient.search.mockResponseOnce(
getAggregationResultMock({
user: 100,
host: 200,
})
);
esClient.search.mockResponseOnce(
getAggregationResultMock({
user: 10,
host: 20,
})
);
esClient.search.mockResponseOnce(
getAggregationResultMock({
user: 1000,
host: 2000,
})
);
esClient.search.mockImplementationOnce(() => {
throw new Error('Connection Error');
});
esClient.indices.stats.mockResponseOnce(
getStatsResultMock({
size: 10000,
})
);
esClient.indices.stats.mockResponseOnce(
getStatsResultMock({
size: 5000,
})
);
const result = await getRiskEngineMetrics({
esClient,
logger,
riskEngineIndexPatterns,
});
expect(result).toEqual({
unique_user_risk_score_total: 100,
unique_host_risk_score_total: 200,
unique_user_risk_score_day: 10,
unique_host_risk_score_day: 20,
all_user_risk_scores_total: 1000,
all_host_risk_scores_total: 2000,
all_risk_scores_index_size: 0.01,
unique_risk_scores_index_size: 0.005,
});
});
});
});

View file

@ -0,0 +1,188 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Logger, ElasticsearchClient } from '@kbn/core/server';
import type { SearchRequest } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { RiskEngineMetrics } from './types';
interface GetRiskEngineMetricsOptions {
esClient: ElasticsearchClient;
logger: Logger;
riskEngineIndexPatterns: {
all: string;
latest: string;
};
}
const allEnititiesByTypeAggregationBody = {
size: 0,
aggs: {
user_name: {
value_count: {
field: 'user.name',
},
},
host_name: {
value_count: {
field: 'host.name',
},
},
},
};
const getEntitiesAggregationData = async ({
esClient,
index,
logger,
hostMetricField,
userMetricField,
lastDay,
}: {
esClient: ElasticsearchClient;
index: string;
logger: Logger;
hostMetricField: string;
userMetricField: string;
lastDay: boolean;
}) => {
try {
const bodyRequest: SearchRequest['body'] = {
...allEnititiesByTypeAggregationBody,
};
if (lastDay) {
bodyRequest.query = {
range: {
'@timestamp': {
gte: 'now-24h',
lt: 'now',
},
},
};
}
const riskScoreAggsResponse = await esClient.search<
never,
{
user_name: {
value: number;
};
host_name: {
value: number;
};
}
>({
index,
body: bodyRequest,
});
return {
[userMetricField]: riskScoreAggsResponse?.aggregations?.user_name?.value,
[hostMetricField]: riskScoreAggsResponse?.aggregations?.host_name?.value,
};
} catch (err) {
logger.error(
`Error while fetching risk score metrics for ${hostMetricField} and ${userMetricField}: ${err}`
);
return {};
}
};
const getIndexSize = async ({
esClient,
index,
logger,
metricField,
}: {
esClient: ElasticsearchClient;
index: string;
logger: Logger;
metricField: string;
}) => {
try {
const riskScoreIndexStats = await esClient.indices.stats({
index,
});
const sizeInMb = (riskScoreIndexStats?._all?.primaries?.store?.size_in_bytes ?? 0) / 1e6;
return {
[metricField]: sizeInMb,
};
} catch (err) {
logger.error(`Error while fetching risk score metrics for ${metricField}: ${err}`);
return {};
}
};
export const getRiskEngineMetrics = async ({
esClient,
logger,
riskEngineIndexPatterns,
}: GetRiskEngineMetricsOptions): Promise<RiskEngineMetrics> => {
logger.info('Fetch risk engine metrics');
try {
const riskEngineIndexes = await esClient.indices.get({
index: `${riskEngineIndexPatterns.all}`,
});
const isRiskEngineExists = Object.keys(riskEngineIndexes).length > 0;
if (!isRiskEngineExists) {
return {};
}
const results = await Promise.all([
getEntitiesAggregationData({
esClient,
index: riskEngineIndexPatterns.latest,
logger,
lastDay: false,
hostMetricField: 'unique_host_risk_score_total',
userMetricField: 'unique_user_risk_score_total',
}),
getEntitiesAggregationData({
esClient,
index: riskEngineIndexPatterns.latest,
logger,
lastDay: true,
hostMetricField: 'unique_host_risk_score_day',
userMetricField: 'unique_user_risk_score_day',
}),
getEntitiesAggregationData({
esClient,
index: riskEngineIndexPatterns.all,
logger,
lastDay: false,
hostMetricField: 'all_host_risk_scores_total',
userMetricField: 'all_user_risk_scores_total',
}),
getEntitiesAggregationData({
esClient,
index: riskEngineIndexPatterns.all,
logger,
lastDay: true,
hostMetricField: 'all_host_risk_scores_total_day',
userMetricField: 'all_user_risk_scores_total_day',
}),
getIndexSize({
esClient,
logger,
index: riskEngineIndexPatterns.all,
metricField: 'all_risk_scores_index_size',
}),
getIndexSize({
esClient,
logger,
index: riskEngineIndexPatterns.latest,
metricField: 'unique_risk_scores_index_size',
}),
]);
return results.reduce((acc, curr) => ({ ...acc, ...curr }), {});
} catch (e) {
logger.error(`Error while fetching risk engine metrics: ${e.message}`);
return {};
}
};

View file

@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { MakeSchemaFrom } from '@kbn/usage-collection-plugin/server';
import type { RiskEngineMetrics } from './types';
export const riskEngineMetricsSchema: MakeSchemaFrom<RiskEngineMetrics> = {
unique_user_risk_score_total: {
type: 'long',
_meta: {
description: 'Total unique user risk scores',
},
},
unique_host_risk_score_total: {
type: 'long',
_meta: {
description: 'Total unique host risk scores',
},
},
unique_user_risk_score_day: {
type: 'long',
_meta: {
description: 'Unique user risk scores per day',
},
},
unique_host_risk_score_day: {
type: 'long',
_meta: {
description: 'Unique host risk scores per day',
},
},
all_host_risk_scores_total: {
type: 'long',
_meta: {
description: 'Total number of host risk score records',
},
},
all_user_risk_scores_total: {
type: 'long',
_meta: {
description: 'Total number of user risk score records',
},
},
all_host_risk_scores_total_day: {
type: 'long',
_meta: {
description: 'Number of host risk score records per day',
},
},
all_user_risk_scores_total_day: {
type: 'long',
_meta: {
description: 'Number of user risk score records per day',
},
},
all_risk_scores_index_size: {
type: 'long',
_meta: {
description: 'Total size of the all Risk Score indices (MB)',
},
},
unique_risk_scores_index_size: {
type: 'long',
_meta: {
description: 'Total size of the unique Risk Score indices (MB)',
},
},
};

View file

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export interface RiskEngineMetrics {
unique_host_risk_score_total?: number;
unique_user_risk_score_total?: number;
unique_user_risk_score_day?: number;
unique_host_risk_score_day?: number;
all_user_risk_scores_total?: number;
all_host_risk_scores_total?: number;
all_user_risk_scores_total_day?: number;
all_host_risk_scores_total_day?: number;
all_risk_scores_index_size?: number;
unique_risk_scores_index_size?: number;
}

View file

@ -29,6 +29,10 @@ export type CollectorDependencies = {
core: CoreSetup;
logger: Logger;
eventLogIndex: string;
riskEngineIndexPatterns: {
all: string;
latest: string;
};
} & Pick<SetupPlugins, 'ml' | 'usageCollection'>;
export interface AlertBucket {

View file

@ -15866,6 +15866,70 @@
}
}
}
},
"riskEngineMetrics": {
"properties": {
"unique_user_risk_score_total": {
"type": "long",
"_meta": {
"description": "Total unique user risk scores"
}
},
"unique_host_risk_score_total": {
"type": "long",
"_meta": {
"description": "Total unique host risk scores"
}
},
"unique_user_risk_score_day": {
"type": "long",
"_meta": {
"description": "Unique user risk scores per day"
}
},
"unique_host_risk_score_day": {
"type": "long",
"_meta": {
"description": "Unique host risk scores per day"
}
},
"all_host_risk_scores_total": {
"type": "long",
"_meta": {
"description": "Total number of host risk score records"
}
},
"all_user_risk_scores_total": {
"type": "long",
"_meta": {
"description": "Total number of user risk score records"
}
},
"all_host_risk_scores_total_day": {
"type": "long",
"_meta": {
"description": "Number of host risk score records per day"
}
},
"all_user_risk_scores_total_day": {
"type": "long",
"_meta": {
"description": "Number of user risk score records per day"
}
},
"all_risk_scores_index_size": {
"type": "long",
"_meta": {
"description": "Total size of the all Risk Score indices (MB)"
}
},
"unique_risk_scores_index_size": {
"type": "long",
"_meta": {
"description": "Total size of the unique Risk Score indices (MB)"
}
}
}
}
}
},

View file

@ -41,6 +41,7 @@ export default ({ loadTestFile }: FtrProviderContext): void => {
loadTestFile(require.resolve('./risk_engine/risk_score_preview'));
loadTestFile(require.resolve('./risk_engine/risk_score_calculation'));
loadTestFile(require.resolve('./risk_engine/risk_scoring_task_execution'));
loadTestFile(require.resolve('./risk_engine/telemetry_usage'));
loadTestFile(require.resolve('./set_alert_tags'));
});
};

View file

@ -23,6 +23,7 @@ import {
getRiskEngineTask,
cleanRiskEngineConfig,
waitForRiskEngineTaskToBeGone,
deleteRiskScoreIndices,
} from './utils';
// eslint-disable-next-line import/no-default-export
@ -270,6 +271,7 @@ export default ({ getService }: FtrProviderContext): void => {
afterEach(async () => {
await getService('spaces').delete(namespace);
await deleteRiskScoreIndices({ log, es, namespace });
});
it('calculates and persists risk scores for alert documents', async () => {

View file

@ -0,0 +1,137 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import { v4 as uuidv4 } from 'uuid';
import type { FtrProviderContext } from '../../../../common/ftr_provider_context';
import { deleteAllRules, deleteAllAlerts, getRiskEngineStats } from '../../../utils';
import {
buildDocument,
createAndSyncRuleAndAlertsFactory,
deleteRiskEngineTask,
deleteRiskScoreIndices,
waitForRiskScoresToBePresent,
riskEngineRouteHelpersFactory,
cleanRiskEngineConfig,
} from './utils';
import { dataGeneratorFactory } from '../../../utils/data_generator';
// eslint-disable-next-line import/no-default-export
export default ({ getService }: FtrProviderContext) => {
const supertest = getService('supertest');
const esArchiver = getService('esArchiver');
const log = getService('log');
const retry = getService('retry');
const es = getService('es');
const createAndSyncRuleAndAlerts = createAndSyncRuleAndAlertsFactory({ supertest, log });
const riskEngineRoutes = riskEngineRouteHelpersFactory(supertest);
describe('Risk engine telemetry', async () => {
const { indexListOfDocuments } = dataGeneratorFactory({
es,
index: 'ecs_compliant',
log,
});
const kibanaServer = getService('kibanaServer');
before(async () => {
await esArchiver.load('x-pack/test/functional/es_archives/security_solution/ecs_compliant');
});
after(async () => {
await esArchiver.unload('x-pack/test/functional/es_archives/security_solution/ecs_compliant');
});
beforeEach(async () => {
await cleanRiskEngineConfig({ kibanaServer });
await deleteRiskEngineTask({ es, log });
await deleteRiskScoreIndices({ log, es });
await deleteAllAlerts(supertest, log, es);
await deleteAllRules(supertest, log);
});
describe('Risk engine not enabled', () => {
it('should has empty riskEngineMetrics', async () => {
await retry.try(async () => {
const stats = await getRiskEngineStats(supertest, log);
const expected = {};
expect(stats).to.eql(expected);
});
});
});
describe('Risk engine enabled', () => {
let hostId: string;
let userId: string;
beforeEach(async () => {
hostId = uuidv4();
const hostEvent = buildDocument({ host: { name: 'host-1' } }, hostId);
await indexListOfDocuments(
Array(10)
.fill(hostEvent)
.map((event, index) => ({
...event,
'host.name': `host-${index}`,
}))
);
userId = uuidv4();
const userEvent = buildDocument({ user: { name: 'user-1' } }, userId);
await indexListOfDocuments(
Array(10)
.fill(userEvent)
.map((event, index) => ({
...event,
'user.name': `user-${index}`,
}))
);
await createAndSyncRuleAndAlerts({
query: `id: ${userId} or id: ${hostId}`,
alerts: 20,
riskScore: 40,
});
await riskEngineRoutes.init();
});
afterEach(async () => {
await cleanRiskEngineConfig({ kibanaServer });
await deleteRiskEngineTask({ es, log });
await deleteRiskScoreIndices({ log, es });
await deleteAllAlerts(supertest, log, es);
await deleteAllRules(supertest, log);
});
it('should return riskEngineMetrics with expected values', async () => {
await waitForRiskScoresToBePresent({ es, log, scoreCount: 20 });
await retry.try(async () => {
const {
all_risk_scores_index_size: allRiskScoreIndexSize,
unique_risk_scores_index_size: uniqueRiskScoreIndexSize,
...otherStats
} = await getRiskEngineStats(supertest, log);
const expected = {
unique_host_risk_score_total: 0,
unique_user_risk_score_total: 0,
unique_user_risk_score_day: 0,
unique_host_risk_score_day: 0,
all_user_risk_scores_total: 10,
all_host_risk_scores_total: 10,
all_user_risk_scores_total_day: 10,
all_host_risk_scores_total_day: 10,
};
expect(otherStats).to.eql(expected);
expect(allRiskScoreIndexSize).to.be.greaterThan(0);
expect(uniqueRiskScoreIndexSize).to.be.greaterThan(0);
});
});
});
});
};

View file

@ -101,6 +101,27 @@ export const createAndSyncRuleAndAlertsFactory =
await waitForSignalsToBePresent(supertest, log, alerts, [id], namespace);
};
export const deleteRiskScoreIndices = async ({
log,
es,
namespace = 'default',
}: {
log: ToolingLog;
es: Client;
namespace?: string;
}) => {
try {
await Promise.allSettled([
es.indices.deleteDataStream({ name: [`risk-score.risk-score-${namespace}`] }),
es.indices.delete({
index: [`risk-score.risk-score-latest-${namespace}`],
}),
]);
} catch (e) {
log.error(`Error deleting risk score indices: ${e.message}`);
}
};
/**
* Deletes all risk scores from a given index or indices, defaults to `risk-score.risk-score-*`
* For use inside of afterEach blocks of tests

View file

@ -6,6 +6,7 @@
*/
import type { DetectionMetrics } from '@kbn/security-solution-plugin/server/usage/detections/types';
import type { RiskEngineMetrics } from '@kbn/security-solution-plugin/server/usage/risk_engine/types';
/**
* Given a body this will return the detection metrics from it.
@ -23,3 +24,20 @@ export const getDetectionMetricsFromBody = (
): DetectionMetrics => {
return body[0].stats.stack_stats.kibana.plugins.security_solution.detectionMetrics;
};
/**
* Given a body this will return the risk engine metrics from it.
* @param body The Stats body
* @returns Detection metrics
*/
export const getRiskEngineMetricsFromBody = (
body: Array<{
stats: {
stack_stats: {
kibana: { plugins: { security_solution: { riskEngineMetrics: {} } } };
};
};
}>
): RiskEngineMetrics => {
return body[0].stats.stack_stats.kibana.plugins.security_solution.riskEngineMetrics;
};

View file

@ -8,13 +8,17 @@
import type { ToolingLog } from '@kbn/tooling-log';
import type SuperTest from 'supertest';
import type { DetectionMetrics } from '@kbn/security-solution-plugin/server/usage/detections/types';
import type { RiskEngineMetrics } from '@kbn/security-solution-plugin/server/usage/risk_engine/types';
import {
ELASTIC_HTTP_VERSION_HEADER,
X_ELASTIC_INTERNAL_ORIGIN_REQUEST,
} from '@kbn/core-http-common';
import { getStatsUrl } from './get_stats_url';
import { getDetectionMetricsFromBody } from './get_detection_metrics_from_body';
import {
getDetectionMetricsFromBody,
getRiskEngineMetricsFromBody,
} from './get_detection_metrics_from_body';
/**
* Gets the stats from the stats endpoint.
@ -38,5 +42,32 @@ export const getStats = async (
)}, status: ${JSON.stringify(response.status)}`
);
}
return getDetectionMetricsFromBody(response.body);
};
/**
* Gets the stats from the stats endpoint.
* @param supertest The supertest agent.
* @returns The detection metrics
*/
export const getRiskEngineStats = async (
supertest: SuperTest.SuperTest<SuperTest.Test>,
log: ToolingLog
): Promise<RiskEngineMetrics> => {
const response = await supertest
.post(getStatsUrl())
.set('kbn-xsrf', 'true')
.set(ELASTIC_HTTP_VERSION_HEADER, '2')
.set(X_ELASTIC_INTERNAL_ORIGIN_REQUEST, 'kibana')
.send({ unencrypted: true, refreshCache: true });
if (response.status !== 200) {
log.error(
`Did not get an expected 200 "ok" when getting the stats for risk engine. CI issues could happen. Suspect this line if you are seeing CI issues. body: ${JSON.stringify(
response.body
)}, status: ${JSON.stringify(response.status)}`
);
}
return getRiskEngineMetricsFromBody(response.body);
};