[serverless] add cspm metering functionality (#162019)

This commit is contained in:
Ido Cohen 2023-07-25 13:17:45 +03:00 committed by GitHub
parent 1c3f4a8543
commit 707ed134be
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 333 additions and 82 deletions

View file

@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { getCspmUsageRecord } from './cspm_metring_task';
import type { MeteringCallbackInput, UsageRecord } from '../types';
export const CLOUD_SECURITY_TASK_TYPE = 'Cloud_Security';
export const cloudSecurityMetringCallback = async ({
esClient,
cloudSetup,
logger,
taskId,
lastSuccessfulReport,
}: MeteringCallbackInput): Promise<UsageRecord[]> => {
const projectId = cloudSetup?.serverless?.projectId || 'missing project id';
if (!cloudSetup?.serverless?.projectId) {
logger.error('no project id found');
}
try {
const cloudSecurityUsageRecords: UsageRecord[] = [];
const cspmUsageRecord = await getCspmUsageRecord({
esClient,
projectId,
logger,
taskId,
lastSuccessfulReport,
});
if (cspmUsageRecord) {
cloudSecurityUsageRecords.push(cspmUsageRecord);
}
return cloudSecurityUsageRecords;
} catch (err) {
logger.error(`Failed to fetch Cloud Security metering data ${err}`);
return [];
}
};

View file

@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
CSPM_POLICY_TEMPLATE,
CSP_LATEST_FINDINGS_DATA_VIEW,
} from '@kbn/cloud-security-posture-plugin/common/constants';
import { CLOUD_SECURITY_TASK_TYPE } from './cloud_security_metring';
import { cloudSecurityMetringTaskProperties } from './metering_tasks_configs';
import type { CloudSecurityMeteringCallbackInput, UsageRecord } from '../types';
const CSPM_CYCLE_SCAN_FREQUENT = '24h';
const CSPM_BUCKET_SUB_TYPE_NAME = 'CSPM';
interface ResourceCountAggregation {
min_timestamp: MinTimestamp;
unique_resources: {
value: number;
};
}
interface MinTimestamp {
value: number;
value_as_string: string;
}
export const getCspmUsageRecord = async ({
esClient,
projectId,
logger,
taskId,
}: CloudSecurityMeteringCallbackInput): Promise<UsageRecord | undefined> => {
try {
const response = await esClient.search<unknown, ResourceCountAggregation>(
getFindingsByResourceAggQuery()
);
if (!response.aggregations) {
return;
}
const cspmResourceCount = response.aggregations.unique_resources.value;
const minTimestamp = response.aggregations
? new Date(response.aggregations.min_timestamp.value_as_string).toISOString()
: new Date().toISOString();
const usageRecords = {
id: `${CLOUD_SECURITY_TASK_TYPE}:${CLOUD_SECURITY_TASK_TYPE}`,
usage_timestamp: minTimestamp,
creation_timestamp: new Date().toISOString(),
usage: {
type: CLOUD_SECURITY_TASK_TYPE,
sub_type: CSPM_BUCKET_SUB_TYPE_NAME,
quantity: cspmResourceCount,
period_seconds: cloudSecurityMetringTaskProperties.periodSeconds,
},
source: {
id: taskId,
instance_group_id: projectId,
},
};
logger.debug(`Fetched CSPM metring data`);
return usageRecords;
} catch (err) {
logger.error(`Failed to fetch CSPM metering data ${err}`);
}
};
export const getFindingsByResourceAggQuery = () => ({
index: CSP_LATEST_FINDINGS_DATA_VIEW,
query: {
bool: {
must: [
{
term: {
'rule.benchmark.posture_type': CSPM_POLICY_TEMPLATE,
},
},
{
range: {
'@timestamp': {
gte: `now-${CSPM_CYCLE_SCAN_FREQUENT}`, // the "look back" period should be the same as the scan interval
},
},
},
],
},
},
size: 0,
aggs: {
unique_resources: {
cardinality: {
field: 'resource.id',
precision_threshold: 3000, // default = 3000 note note that even with a threshold as low as 100, the error remains very low 1-6% even when counting millions of items. https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html#_counts_are_approximate
},
},
min_timestamp: {
min: {
field: '@timestamp',
},
},
},
});

View file

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { cloudSecurityMetringCallback } from './cloud_security_metring';
import type { MetringTaskProperties } from '../types';
const TASK_INTERVAL = 3600; // 1 hour
export const cloudSecurityMetringTaskProperties: MetringTaskProperties = {
taskType: 'cloud-security-usage-reporting-task',
taskTitle: 'Cloud Security Metring Periodic Tasks',
meteringCallback: cloudSecurityMetringCallback,
interval: `${TASK_INTERVAL.toString()}s`,
periodSeconds: TASK_INTERVAL,
version: '1',
};

View file

@ -15,7 +15,7 @@ export class UsageReportingService {
public async reportUsage(records: UsageRecord[]): Promise<Response> {
return fetch(USAGE_SERVICE_USAGE_URL, {
method: 'post',
body: JSON.stringify([records]),
body: JSON.stringify(records),
headers: { 'Content-Type': 'application/json' },
});
}

View file

@ -15,6 +15,8 @@ import type {
SecuritySolutionServerlessPluginSetupDeps,
SecuritySolutionServerlessPluginStartDeps,
} from './types';
import { SecurityUsageReportingTask } from './task_manager/usage_reporting_task';
import { cloudSecurityMetringTaskProperties } from './cloud_security/metering_tasks_configs';
export class SecuritySolutionServerlessPlugin
implements
@ -26,6 +28,7 @@ export class SecuritySolutionServerlessPlugin
>
{
private config: ServerlessSecurityConfig;
private cspmUsageReportingTask: SecurityUsageReportingTask | undefined;
constructor(private readonly initializerContext: PluginInitializerContext) {
this.config = this.initializerContext.config.get<ServerlessSecurityConfig>();
@ -35,17 +38,33 @@ export class SecuritySolutionServerlessPlugin
// securitySolutionEss plugin should always be disabled when securitySolutionServerless is enabled.
// This check is an additional layer of security to prevent double registrations when
// `plugins.forceEnableAllPlugins` flag is enabled).
const shouldRegister = pluginsSetup.securitySolutionEss == null;
if (shouldRegister) {
pluginsSetup.securitySolution.setAppFeatures(getProductAppFeatures(this.config.productTypes));
}
pluginsSetup.ml.setFeaturesEnabled({ ad: true, dfa: true, nlp: false });
this.cspmUsageReportingTask = new SecurityUsageReportingTask({
core: _coreSetup,
logFactory: this.initializerContext.logger,
taskManager: pluginsSetup.taskManager,
cloudSetup: pluginsSetup.cloudSetup,
taskType: cloudSecurityMetringTaskProperties.taskType,
taskTitle: cloudSecurityMetringTaskProperties.taskTitle,
version: cloudSecurityMetringTaskProperties.version,
meteringCallback: cloudSecurityMetringTaskProperties.meteringCallback,
});
return {};
}
public start(_coreStart: CoreStart, pluginsSetup: SecuritySolutionServerlessPluginStartDeps) {
this.cspmUsageReportingTask?.start({
taskManager: pluginsSetup.taskManager,
interval: cloudSecurityMetringTaskProperties.interval,
});
return {};
}

View file

@ -6,79 +6,71 @@
*/
import type { Response } from 'node-fetch';
import type { CoreSetup, ElasticsearchClient, Logger, LoggerFactory } from '@kbn/core/server';
import type {
ConcreteTaskInstance,
TaskManagerSetupContract,
TaskManagerStartContract,
} from '@kbn/task-manager-plugin/server';
import type { CoreSetup, Logger } from '@kbn/core/server';
import type { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server';
import type { CloudSetup } from '@kbn/cloud-plugin/server';
import { throwUnrecoverableError } from '@kbn/task-manager-plugin/server';
import type { UsageRecord } from '../types';
import { usageReportingService } from '../common/services';
import type {
MeteringCallback,
SecurityUsageReportingTaskStartContract,
SecurityUsageReportingTaskSetupContract,
} from '../types';
const SCOPE = ['serverlessSecurity'];
const TIMEOUT = '1m';
export const VERSION = '1.0.0';
type MeteringCallback = (metringCallbackInput: MeteringCallbackInput) => UsageRecord[];
export interface MeteringCallbackInput {
esClient: ElasticsearchClient;
lastSuccessfulReport: Date;
}
export interface CloudSecurityUsageReportingTaskSetupContract {
taskType: string;
taskTitle: string;
meteringCallback: MeteringCallback;
logFactory: LoggerFactory;
core: CoreSetup;
taskManager: TaskManagerSetupContract;
}
export interface SecurityMetadataTaskStartContract {
taskType: string;
interval: string;
version: string;
taskManager: TaskManagerStartContract;
}
export class SecurityUsageReportingTask {
private logger: Logger;
private wasStarted: boolean = false;
private cloudSetup: CloudSetup;
private taskType: string;
private taskId: string;
private version: string;
private logger: Logger;
constructor(setupContract: CloudSecurityUsageReportingTaskSetupContract) {
const { taskType, taskTitle, logFactory, core, taskManager, meteringCallback } = setupContract;
constructor(setupContract: SecurityUsageReportingTaskSetupContract) {
const {
core,
logFactory,
taskManager,
cloudSetup,
taskType,
taskTitle,
version,
meteringCallback,
} = setupContract;
this.logger = logFactory.get(this.getTaskId(taskType));
taskManager.registerTaskDefinitions({
[taskType]: {
title: taskTitle,
timeout: TIMEOUT,
createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => {
return {
run: async () => {
return this.runTask(taskInstance, core, meteringCallback);
},
// TODO
cancel: async () => {},
};
this.cloudSetup = cloudSetup;
this.taskType = taskType;
this.version = version;
this.taskId = this.getTaskId(taskType, version);
this.logger = logFactory.get(this.taskId);
try {
taskManager.registerTaskDefinitions({
[taskType]: {
title: taskTitle,
timeout: TIMEOUT,
createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => {
return {
run: async () => {
return this.runTask(taskInstance, core, meteringCallback);
},
cancel: async () => {},
};
},
},
},
});
});
this.logger.info(`Scheduled task successfully ${taskTitle}`);
} catch (err) {
this.logger.error(`Failed to setup task ${taskType}, ${err} `);
}
}
public start = async ({
taskManager,
taskType,
interval,
version,
}: SecurityMetadataTaskStartContract) => {
public start = async ({ taskManager, interval }: SecurityUsageReportingTaskStartContract) => {
if (!taskManager) {
this.logger.error('missing required service during start');
return;
}
@ -86,8 +78,8 @@ export class SecurityUsageReportingTask {
try {
await taskManager.ensureScheduled({
id: this.getTaskId(taskType),
taskType,
id: this.taskId,
taskType: this.taskType,
scope: SCOPE,
schedule: {
interval,
@ -95,7 +87,7 @@ export class SecurityUsageReportingTask {
state: {
lastSuccessfulReport: null,
},
params: { version },
params: { version: this.version },
});
} catch (e) {
this.logger.debug(`Error scheduling task, received ${e.message}`);
@ -112,27 +104,47 @@ export class SecurityUsageReportingTask {
this.logger.debug('[runTask()] Aborted. Task not started yet');
return;
}
// Check that this task is current
if (taskInstance.id !== this.getTaskId(taskInstance.taskType)) {
if (taskInstance.id !== this.taskId) {
// old task, die
throwUnrecoverableError(new Error('Outdated task version'));
}
const [{ elasticsearch }] = await core.getStartServices();
const esClient = elasticsearch.client.asInternalUser;
const lastSuccessfulReport = taskInstance.state.lastSuccessfulReport;
const usageRecords = meteringCallback({ esClient, lastSuccessfulReport });
const usageRecords = await meteringCallback({
esClient,
cloudSetup: this.cloudSetup,
logger: this.logger,
taskId: this.taskId,
lastSuccessfulReport,
});
this.logger.debug(`received usage records: ${JSON.stringify(usageRecords)}`);
let usageReportResponse: Response | undefined;
try {
usageReportResponse = await usageReportingService.reportUsage(usageRecords);
} catch (e) {
this.logger.warn(JSON.stringify(e));
if (usageRecords.length !== 0) {
try {
usageReportResponse = await usageReportingService.reportUsage(usageRecords);
if (!usageReportResponse.ok) {
const errorResponse = await usageReportResponse.json();
this.logger.error(`API error ${usageReportResponse.status}, ${errorResponse}`);
return;
}
this.logger.info(
`usage records report was sent successfully: ${usageReportResponse.status}, ${usageReportResponse.statusText}`
);
} catch (err) {
this.logger.error(`Failed to send usage records report ${JSON.stringify(err)} `);
}
}
const state = {
lastSuccessfulReport:
usageReportResponse?.status === 201 ? new Date() : taskInstance.state.lastSuccessfulReport,
@ -140,7 +152,7 @@ export class SecurityUsageReportingTask {
return { state };
};
private getTaskId = (taskType: string): string => {
return `${taskType}:${VERSION}`;
private getTaskId = (taskType: string, version: string): string => {
return `${taskType}:${version}`;
};
}

View file

@ -4,7 +4,7 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { CoreSetup, ElasticsearchClient, Logger, LoggerFactory } from '@kbn/core/server';
import type { SecurityPluginSetup, SecurityPluginStart } from '@kbn/security-plugin/server';
import type { PluginSetupContract, PluginStartContract } from '@kbn/features-plugin/server';
import type {
@ -12,9 +12,10 @@ import type {
PluginStart as SecuritySolutionPluginStart,
} from '@kbn/security-solution-plugin/server';
import type {
TaskManagerSetupContract as TaskManagerPluginSetup,
TaskManagerStartContract as TaskManagerPluginStart,
TaskManagerSetupContract,
TaskManagerStartContract,
} from '@kbn/task-manager-plugin/server';
import type { CloudSetup } from '@kbn/cloud-plugin/server';
import type { SecuritySolutionEssPluginSetup } from '@kbn/security-solution-ess/server';
import type { MlPluginSetup } from '@kbn/ml-plugin/server';
@ -30,14 +31,15 @@ export interface SecuritySolutionServerlessPluginSetupDeps {
securitySolutionEss: SecuritySolutionEssPluginSetup;
features: PluginSetupContract;
ml: MlPluginSetup;
taskManager: TaskManagerPluginSetup;
taskManager: TaskManagerSetupContract;
cloudSetup: CloudSetup;
}
export interface SecuritySolutionServerlessPluginStartDeps {
security: SecurityPluginStart;
securitySolution: SecuritySolutionPluginStart;
features: PluginStartContract;
taskManager: TaskManagerPluginStart;
taskManager: TaskManagerStartContract;
}
export interface UsageRecord {
@ -60,5 +62,47 @@ export interface UsageMetrics {
export interface UsageSource {
id: string;
instance_group_id: string;
instance_group_type: string;
instance_group_type?: string; // not seems part of step 1 fields https://github.com/elastic/mx-team/blob/main/teams/billing/services/usage_record_schema_v2.md
}
export interface SecurityUsageReportingTaskSetupContract {
core: CoreSetup;
logFactory: LoggerFactory;
taskManager: TaskManagerSetupContract;
cloudSetup: CloudSetup;
taskType: string;
taskTitle: string;
version: string;
meteringCallback: MeteringCallback;
}
export interface SecurityUsageReportingTaskStartContract {
taskManager: TaskManagerStartContract;
interval: string;
}
export type MeteringCallback = (
metringCallbackInput: MeteringCallbackInput
) => Promise<UsageRecord[]>;
export interface MeteringCallbackInput {
esClient: ElasticsearchClient;
cloudSetup: CloudSetup;
logger: Logger;
taskId: string;
lastSuccessfulReport: Date;
}
export interface CloudSecurityMeteringCallbackInput
extends Omit<MeteringCallbackInput, 'cloudSetup'> {
projectId: string;
}
export interface MetringTaskProperties {
taskType: string;
taskTitle: string;
meteringCallback: MeteringCallback;
interval: string;
periodSeconds: number;
version: string;
}

View file

@ -11,9 +11,7 @@
"server/**/*.ts",
"../../../typings/**/*"
],
"exclude": [
"target/**/*"
],
"exclude": ["target/**/*"],
"kbn_references": [
"@kbn/core",
"@kbn/config-schema",
@ -32,6 +30,8 @@
"@kbn/features-plugin",
"@kbn/ml-plugin",
"@kbn/kibana-utils-plugin",
"@kbn/task-manager-plugin"
"@kbn/task-manager-plugin",
"@kbn/cloud-plugin",
"@kbn/cloud-security-posture-plugin"
]
}