[Security Solution] add endpoint metering task (#162203)

This commit is contained in:
Joey F. Poon 2023-07-26 07:04:35 -07:00 committed by GitHub
parent 41e236316c
commit 66fb375506
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 212 additions and 22 deletions

View file

@ -45,6 +45,8 @@ export const policyIndexPattern = 'metrics-endpoint.policy-*';
export const telemetryIndexPattern = 'metrics-endpoint.telemetry-*';
export const ENDPOINT_HEARTBEAT_INDEX = 'logs-endpoint.heartbeat-default';
// File storage indexes supporting endpoint Upload/download
export const FILE_STORAGE_METADATA_INDEX = getFileMetadataIndexName('endpoint');
export const FILE_STORAGE_DATA_INDEX = getFileDataIndexName('endpoint');

View file

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export interface EndpointHeartbeat {
'@timestamp': string;
agent: {
id: string;
};
event: {
ingested: string;
};
}

View file

@ -1344,3 +1344,5 @@ export interface AdditionalOnSwitchChangeParams {
export type MetadataListResponse = BaseListResponse<HostInfo>;
export type { EndpointPrivileges } from './authz';
export type { EndpointHeartbeat } from './heartbeat';

View file

@ -18,7 +18,8 @@
"security",
"securitySolution",
"serverless",
"taskManager"
"taskManager",
"cloud"
],
"optionalPlugins": [
"securitySolutionEss"

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { getCspmUsageRecord } from './cspm_metring_task';
import { getCspmUsageRecord } from './cspm_metering_task';
import type { MeteringCallbackInput, UsageRecord } from '../types';
export const CLOUD_SECURITY_TASK_TYPE = 'Cloud_Security';
@ -16,6 +16,7 @@ export const cloudSecurityMetringCallback = async ({
logger,
taskId,
lastSuccessfulReport,
abortController,
}: MeteringCallbackInput): Promise<UsageRecord[]> => {
const projectId = cloudSetup?.serverless?.projectId || 'missing project id';
@ -32,6 +33,7 @@ export const cloudSecurityMetringCallback = async ({
logger,
taskId,
lastSuccessfulReport,
abortController,
});
if (cspmUsageRecord) {

View file

@ -9,14 +9,18 @@ import {
CSPM_POLICY_TEMPLATE,
CSP_LATEST_FINDINGS_DATA_VIEW,
} from '@kbn/cloud-security-posture-plugin/common/constants';
import { CLOUD_SECURITY_TASK_TYPE } from './cloud_security_metring';
import { CLOUD_SECURITY_TASK_TYPE } from './cloud_security_metering';
import { cloudSecurityMetringTaskProperties } from './metering_tasks_configs';
import type { CloudSecurityMeteringCallbackInput, UsageRecord } from '../types';
import type { UsageRecord, MeteringCallbackInput } from '../types';
const CSPM_CYCLE_SCAN_FREQUENT = '24h';
const CSPM_BUCKET_SUB_TYPE_NAME = 'CSPM';
interface CloudSecurityMeteringCallbackInput extends Omit<MeteringCallbackInput, 'cloudSetup'> {
projectId: string;
}
interface ResourceCountAggregation {
min_timestamp: MinTimestamp;
unique_resources: {
@ -49,7 +53,7 @@ export const getCspmUsageRecord = async ({
? new Date(response.aggregations.min_timestamp.value_as_string).toISOString()
: new Date().toISOString();
const usageRecords = {
const usageRecord = {
id: `${CLOUD_SECURITY_TASK_TYPE}:${CLOUD_SECURITY_TASK_TYPE}`,
usage_timestamp: minTimestamp,
creation_timestamp: new Date().toISOString(),
@ -67,7 +71,7 @@ export const getCspmUsageRecord = async ({
logger.debug(`Fetched CSPM metring data`);
return usageRecords;
return usageRecord;
} catch (err) {
logger.error(`Failed to fetch CSPM metering data ${err}`);
}

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { cloudSecurityMetringCallback } from './cloud_security_metring';
import { cloudSecurityMetringCallback } from './cloud_security_metering';
import type { MetringTaskProperties } from '../types';
const TASK_INTERVAL = 3600; // 1 hour

View file

@ -7,6 +7,6 @@
// TODO: this probably shouldn't live in code
const namespace = 'elastic-system';
const USAGE_SERVICE_BASE_API_URL = `http://usage-api.${namespace}/api`;
const USAGE_SERVICE_BASE_API_URL = `https://usage-api.${namespace}/api`;
const USAGE_SERVICE_BASE_API_URL_V1 = `${USAGE_SERVICE_BASE_API_URL}/v1`;
export const USAGE_SERVICE_USAGE_URL = `${USAGE_SERVICE_BASE_API_URL_V1}/usage`;

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const METERING_TASK = {
TITLE: 'Serverless Endpoint Usage Metering Task',
TYPE: 'serverless-security:endpoint-usage-reporting-task',
VERSION: '1.0.0',
INTERVAL: '5m',
};

View file

@ -0,0 +1,8 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { endpointMeteringService } from './metering_service';

View file

@ -0,0 +1,129 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { AggregationsAggregate, SearchResponse } from '@elastic/elasticsearch/lib/api/types';
import type { ElasticsearchClient } from '@kbn/core/server';
// import { ENDPOINT_HEARTBEAT_INDEX } from '@kbn/security-solution-plugin/common/endpoint/constants';
import type { EndpointHeartbeat } from '@kbn/security-solution-plugin/common/endpoint/types';
import type { UsageRecord, MeteringCallbackInput } from '../../types';
// 1 hour
const SAMPLE_PERIOD_SECONDS = 3600;
// const THRESHOLD_MINUTES = 30;
export class EndpointMeteringService {
public async getUsageRecords({
taskId,
cloudSetup,
esClient,
abortController,
lastSuccessfulReport,
}: MeteringCallbackInput): Promise<UsageRecord[]> {
const heartbeatsResponse = await this.getHeartbeatsSince(
esClient,
abortController,
lastSuccessfulReport
);
return heartbeatsResponse.hits.hits.reduce((acc, { _source }) => {
if (!_source) {
return acc;
}
const { agent, event } = _source;
const record = this.buildMeteringRecord({
agentId: agent.id,
timestampStr: event.ingested,
taskId,
projectId: cloudSetup?.serverless?.projectId,
});
return [...acc, record];
}, [] as UsageRecord[]);
}
private async getHeartbeatsSince(
esClient: ElasticsearchClient,
abortController: AbortController,
since?: Date
): Promise<SearchResponse<EndpointHeartbeat, Record<string, AggregationsAggregate>>> {
const timestamp = new Date().toISOString();
return {
hits: {
hits: [
{
_source: {
'@timestamp': timestamp,
agent: {
id: '123',
},
event: {
ingested: timestamp,
},
},
},
],
},
} as SearchResponse<EndpointHeartbeat, Record<string, AggregationsAggregate>>;
// TODO: enable when heartbeat index is ready
// const thresholdDate = new Date(Date.now() - THRESHOLD_MINUTES * 60 * 1000);
// const searchFrom = since && since > thresholdDate ? since : thresholdDate;
// return esClient.search<EndpointHeartbeat>(
// {
// index: ENDPOINT_HEARTBEAT_INDEX,
// sort: 'event.ingested',
// query: {
// range: {
// 'event.ingested': {
// gt: searchFrom.toISOString(),
// },
// },
// },
// },
// { signal: abortController.signal }
// );
}
private buildMeteringRecord({
agentId,
timestampStr,
taskId,
projectId = '',
}: {
agentId: string;
timestampStr: string;
taskId: string;
projectId?: string;
}): UsageRecord {
const timestamp = new Date(timestampStr);
timestamp.setMinutes(0);
timestamp.setSeconds(0);
timestamp.setMilliseconds(0);
return {
id: `endpoint-${agentId}-${timestamp}`,
usage_timestamp: timestampStr,
creation_timestamp: timestampStr,
usage: {
type: 'security_solution_endpoint',
// TODO: get actual sub_type
sub_type: 'essential',
period_seconds: SAMPLE_PERIOD_SECONDS,
quantity: 1,
},
source: {
id: taskId,
instance_group_id: projectId,
},
};
}
}
export const endpointMeteringService = new EndpointMeteringService();

View file

@ -6,9 +6,12 @@
*/
import type { PluginInitializerContext, Plugin, CoreSetup, CoreStart } from '@kbn/core/server';
import type { ServerlessSecurityConfig } from './config';
import { getProductAppFeatures } from '../common/pli/pli_features';
import { getProductAppFeatures } from '../common/pli/pli_features';
import { METERING_TASK as ENDPOINT_METERING_TASK } from './endpoint/constants/metering';
import { endpointMeteringService } from './endpoint/services';
import type { ServerlessSecurityConfig } from './config';
import type {
SecuritySolutionServerlessPluginSetup,
SecuritySolutionServerlessPluginStart,
@ -29,6 +32,7 @@ export class SecuritySolutionServerlessPlugin
{
private config: ServerlessSecurityConfig;
private cspmUsageReportingTask: SecurityUsageReportingTask | undefined;
private endpointUsageReportingTask: SecurityUsageReportingTask | undefined;
constructor(private readonly initializerContext: PluginInitializerContext) {
this.config = this.initializerContext.config.get<ServerlessSecurityConfig>();
@ -57,6 +61,16 @@ export class SecuritySolutionServerlessPlugin
meteringCallback: cloudSecurityMetringTaskProperties.meteringCallback,
});
this.endpointUsageReportingTask = new SecurityUsageReportingTask({
core: _coreSetup,
logFactory: this.initializerContext.logger,
taskType: ENDPOINT_METERING_TASK.TYPE,
taskTitle: ENDPOINT_METERING_TASK.TITLE,
version: ENDPOINT_METERING_TASK.VERSION,
meteringCallback: endpointMeteringService.getUsageRecords,
taskManager: pluginsSetup.taskManager,
cloudSetup: pluginsSetup.cloudSetup,
});
return {};
}
@ -65,6 +79,11 @@ export class SecuritySolutionServerlessPlugin
taskManager: pluginsSetup.taskManager,
interval: cloudSecurityMetringTaskProperties.interval,
});
this.endpointUsageReportingTask?.start({
taskManager: pluginsSetup.taskManager,
interval: ENDPOINT_METERING_TASK.INTERVAL,
});
return {};
}

View file

@ -26,9 +26,9 @@ export class SecurityUsageReportingTask {
private wasStarted: boolean = false;
private cloudSetup: CloudSetup;
private taskType: string;
private taskId: string;
private version: string;
private logger: Logger;
private abortController = new AbortController();
constructor(setupContract: SecurityUsageReportingTaskSetupContract) {
const {
@ -45,7 +45,6 @@ export class SecurityUsageReportingTask {
this.cloudSetup = cloudSetup;
this.taskType = taskType;
this.version = version;
this.taskId = this.getTaskId(taskType, version);
this.logger = logFactory.get(this.taskId);
try {
@ -121,6 +120,7 @@ export class SecurityUsageReportingTask {
logger: this.logger,
taskId: this.taskId,
lastSuccessfulReport,
abortController: this.abortController,
});
this.logger.debug(`received usage records: ${JSON.stringify(usageRecords)}`);
@ -152,7 +152,7 @@ export class SecurityUsageReportingTask {
return { state };
};
private getTaskId = (taskType: string, version: string): string => {
return `${taskType}:${version}`;
};
private get taskId() {
return `${this.taskType}:${this.version}`;
}
}

View file

@ -16,7 +16,6 @@ import type {
TaskManagerStartContract,
} from '@kbn/task-manager-plugin/server';
import type { CloudSetup } from '@kbn/cloud-plugin/server';
import type { SecuritySolutionEssPluginSetup } from '@kbn/security-solution-ess/server';
import type { MlPluginSetup } from '@kbn/ml-plugin/server';
@ -62,7 +61,6 @@ export interface UsageMetrics {
export interface UsageSource {
id: string;
instance_group_id: string;
instance_group_type?: string; // not seems part of step 1 fields https://github.com/elastic/mx-team/blob/main/teams/billing/services/usage_record_schema_v2.md
}
export interface SecurityUsageReportingTaskSetupContract {
@ -91,11 +89,7 @@ export interface MeteringCallbackInput {
logger: Logger;
taskId: string;
lastSuccessfulReport: Date;
}
export interface CloudSecurityMeteringCallbackInput
extends Omit<MeteringCallbackInput, 'cloudSetup'> {
projectId: string;
abortController: AbortController;
}
export interface MetringTaskProperties {