[8.5] [Security Solution] Telemetry Task Metric Collector (#140503) (#141326)

# Backport

This will backport the following commits from `main` to `8.5`:
- [[Security Solution] Telemetry Task Metric Collector
(#140503)](https://github.com/elastic/kibana/pull/140503)

<!--- Backport version: 8.9.2 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"JD
Kurma","email":"JDKurma@gmail.com"},"sourceCommit":{"committedDate":"2022-09-21T23:45:35Z","message":"[Security
Solution] Telemetry Task Metric Collector (#140503)\n\n* task metric
logger\r\n\r\n* [CI] Auto-commit changed files from 'node scripts/eslint
--no-cache --fix'\r\n\r\n* add start time and end time to task
metric\r\n\r\n* update test\r\n\r\n* [CI] Auto-commit changed files from
'node scripts/eslint --no-cache --fix'\r\n\r\n* merge\r\n\r\n* [CI]
Auto-commit changed files from 'node scripts/eslint --no-cache
--fix'\r\n\r\n* return 0 in catch\r\n\r\nCo-authored-by: kibanamachine
<42973632+kibanamachine@users.noreply.github.com>","sha":"9b7a65cf2a3b5be534374d3719ebe643b3a15cff","branchLabelMapping":{"^v8.6.0$":"main","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["Feature:Telemetry","release_note:skip","Team:
SecuritySolution","auto-backport","v8.5.0"],"number":140503,"url":"https://github.com/elastic/kibana/pull/140503","mergeCommit":{"message":"[Security
Solution] Telemetry Task Metric Collector (#140503)\n\n* task metric
logger\r\n\r\n* [CI] Auto-commit changed files from 'node scripts/eslint
--no-cache --fix'\r\n\r\n* add start time and end time to task
metric\r\n\r\n* update test\r\n\r\n* [CI] Auto-commit changed files from
'node scripts/eslint --no-cache --fix'\r\n\r\n* merge\r\n\r\n* [CI]
Auto-commit changed files from 'node scripts/eslint --no-cache
--fix'\r\n\r\n* return 0 in catch\r\n\r\nCo-authored-by: kibanamachine
<42973632+kibanamachine@users.noreply.github.com>","sha":"9b7a65cf2a3b5be534374d3719ebe643b3a15cff"}},"sourceBranch":"main","suggestedTargetBranches":["8.5"],"targetPullRequestStates":[{"branch":"8.5","label":"v8.5.0","labelRegex":"^v(\\d+).(\\d+).\\d+$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: JD Kurma <JDKurma@gmail.com>
This commit is contained in:
Kibana Machine 2022-11-14 12:25:03 -05:00 committed by GitHub
parent be4cf7a6fd
commit de0247ff39
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 873 additions and 564 deletions

View file

@ -23,6 +23,8 @@ export const LIST_TRUSTED_APPLICATION = 'trusted_application';
export const INSIGHTS_CHANNEL = 'security-insights-v1';
export const TASK_METRICS_CHANNEL = 'task-metrics';
export const DEFAULT_ADVANCED_POLICY_CONFIG_SETTINGS = {
linux: {
advanced: {

View file

@ -24,6 +24,7 @@ import {
metricsResponseToValueListMetaData,
tlog,
setIsElasticCloudDeployment,
createTaskMetric,
} from './helpers';
import type { ESClusterInfo, ESLicense, ExceptionListItem } from './types';
import type { PolicyConfig, PolicyData } from '../../../common/endpoint/types';
@ -931,3 +932,42 @@ describe('test tlog', () => {
expect(logger.debug).toHaveBeenCalled();
});
});
describe('test create task metrics', () => {
test('can succeed when all parameters are given', async () => {
const stubTaskName = 'test';
const stubPassed = true;
const stubStartTime = Date.now();
await new Promise((r) => setTimeout(r, 11));
const response = createTaskMetric(stubTaskName, stubPassed, stubStartTime);
const {
time_executed_in_ms: timeExecutedInMs,
start_time: startTime,
end_time: endTime,
...rest
} = response;
expect(timeExecutedInMs).toBeGreaterThan(10);
expect(rest).toEqual({
name: 'test',
passed: true,
});
});
test('can succeed when error given', async () => {
const stubTaskName = 'test';
const stubPassed = false;
const stubStartTime = Date.now();
const errorMessage = 'failed';
const response = createTaskMetric(stubTaskName, stubPassed, stubStartTime, errorMessage);
const {
time_executed_in_ms: timeExecutedInMs,
start_time: startTime,
end_time: endTime,
...rest
} = response;
expect(rest).toEqual({
name: 'test',
passed: false,
error_message: 'failed',
});
});
});

View file

@ -22,6 +22,7 @@ import type {
ValueListExceptionListResponseAggregation,
ValueListItemsResponseAggregation,
ValueListIndicatorMatchResponseAggregation,
TaskMetric,
} from './types';
import {
LIST_DETECTION_RULE_EXCEPTION,
@ -280,3 +281,20 @@ export const tlog = (logger: Logger, message: string) => {
logger.debug(message);
}
};
export const createTaskMetric = (
name: string,
passed: boolean,
startTime: number,
errorMessage?: string
): TaskMetric => {
const endTime = Date.now();
return {
name,
passed,
time_executed_in_ms: endTime - startTime,
start_time: startTime,
end_time: endTime,
error_message: errorMessage,
};
};

View file

@ -6,8 +6,12 @@
*/
import type { Logger } from '@kbn/core/server';
import { LIST_DETECTION_RULE_EXCEPTION, TELEMETRY_CHANNEL_LISTS } from '../constants';
import { batchTelemetryRecords, templateExceptionList, tlog } from '../helpers';
import {
LIST_DETECTION_RULE_EXCEPTION,
TELEMETRY_CHANNEL_LISTS,
TASK_METRICS_CHANNEL,
} from '../constants';
import { batchTelemetryRecords, templateExceptionList, tlog, createTaskMetric } from '../helpers';
import type { ITelemetryEventsSender } from '../sender';
import type { ITelemetryReceiver } from '../receiver';
import type { ExceptionListItem, ESClusterInfo, ESLicense, RuleSearchResult } from '../types';
@ -27,74 +31,87 @@ export function createTelemetryDetectionRuleListsTaskConfig(maxTelemetryBatch: n
sender: ITelemetryEventsSender,
taskExecutionPeriod: TaskExecutionPeriod
) => {
tlog(logger, 'test');
const [clusterInfoPromise, licenseInfoPromise] = await Promise.allSettled([
receiver.fetchClusterInfo(),
receiver.fetchLicenseInfo(),
]);
const startTime = Date.now();
const taskName = 'Security Solution Detection Rule Lists Telemetry';
try {
const [clusterInfoPromise, licenseInfoPromise] = await Promise.allSettled([
receiver.fetchClusterInfo(),
receiver.fetchLicenseInfo(),
]);
const clusterInfo =
clusterInfoPromise.status === 'fulfilled'
? clusterInfoPromise.value
: ({} as ESClusterInfo);
const licenseInfo =
licenseInfoPromise.status === 'fulfilled'
? licenseInfoPromise.value
: ({} as ESLicense | undefined);
const clusterInfo =
clusterInfoPromise.status === 'fulfilled'
? clusterInfoPromise.value
: ({} as ESClusterInfo);
const licenseInfo =
licenseInfoPromise.status === 'fulfilled'
? licenseInfoPromise.value
: ({} as ESLicense | undefined);
// Lists Telemetry: Detection Rules
// Lists Telemetry: Detection Rules
const { body: prebuiltRules } = await receiver.fetchDetectionRules();
const { body: prebuiltRules } = await receiver.fetchDetectionRules();
if (!prebuiltRules) {
tlog(logger, 'no prebuilt rules found');
return 0;
}
const cacheArray = prebuiltRules.hits.hits.reduce((cache, searchHit) => {
const rule = searchHit._source as RuleSearchResult;
const ruleId = rule.alert.params.ruleId;
const shouldNotProcess =
rule === null ||
rule === undefined ||
ruleId === null ||
ruleId === undefined ||
searchHit._source?.alert.params.exceptionsList.length === 0;
if (shouldNotProcess) {
return cache;
if (!prebuiltRules) {
tlog(logger, 'no prebuilt rules found');
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, true, startTime),
]);
return 0;
}
cache.push(rule);
return cache;
}, [] as RuleSearchResult[]);
const cacheArray = prebuiltRules.hits.hits.reduce((cache, searchHit) => {
const rule = searchHit._source as RuleSearchResult;
const ruleId = rule.alert.params.ruleId;
const detectionRuleExceptions = [] as ExceptionListItem[];
for (const item of cacheArray) {
const ruleVersion = item.alert.params.version;
const shouldNotProcess =
rule === null ||
rule === undefined ||
ruleId === null ||
ruleId === undefined ||
searchHit._source?.alert.params.exceptionsList.length === 0;
for (const ex of item.alert.params.exceptionsList) {
const listItem = await receiver.fetchDetectionExceptionList(ex.list_id, ruleVersion);
for (const exceptionItem of listItem.data) {
detectionRuleExceptions.push(exceptionItem);
if (shouldNotProcess) {
return cache;
}
cache.push(rule);
return cache;
}, [] as RuleSearchResult[]);
const detectionRuleExceptions = [] as ExceptionListItem[];
for (const item of cacheArray) {
const ruleVersion = item.alert.params.version;
for (const ex of item.alert.params.exceptionsList) {
const listItem = await receiver.fetchDetectionExceptionList(ex.list_id, ruleVersion);
for (const exceptionItem of listItem.data) {
detectionRuleExceptions.push(exceptionItem);
}
}
}
}
const detectionRuleExceptionsJson = templateExceptionList(
detectionRuleExceptions,
clusterInfo,
licenseInfo,
LIST_DETECTION_RULE_EXCEPTION
);
tlog(logger, `Detection rule exception json length ${detectionRuleExceptionsJson.length}`);
const batches = batchTelemetryRecords(detectionRuleExceptionsJson, maxTelemetryBatch);
for (const batch of batches) {
await sender.sendOnDemand(TELEMETRY_CHANNEL_LISTS, batch);
const detectionRuleExceptionsJson = templateExceptionList(
detectionRuleExceptions,
clusterInfo,
licenseInfo,
LIST_DETECTION_RULE_EXCEPTION
);
tlog(logger, `Detection rule exception json length ${detectionRuleExceptionsJson.length}`);
const batches = batchTelemetryRecords(detectionRuleExceptionsJson, maxTelemetryBatch);
for (const batch of batches) {
await sender.sendOnDemand(TELEMETRY_CHANNEL_LISTS, batch);
}
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, true, startTime),
]);
return detectionRuleExceptions.length;
} catch (err) {
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, false, startTime, err.message),
]);
return 0;
}
return detectionRuleExceptions.length;
},
};
}

View file

@ -46,5 +46,6 @@ describe('diagnostics telemetry task test', () => {
expect(mockTelemetryEventsSender.queueTelemetryEvents).toHaveBeenCalledWith(
testDiagnosticsAlerts.hits.hits.flatMap((doc) => [doc._source])
);
expect(mockTelemetryEventsSender.sendOnDemand).toBeCalledTimes(1);
});
});

View file

@ -6,11 +6,12 @@
*/
import type { Logger } from '@kbn/core/server';
import { tlog, getPreviousDiagTaskTimestamp } from '../helpers';
import { tlog, getPreviousDiagTaskTimestamp, createTaskMetric } from '../helpers';
import type { ITelemetryEventsSender } from '../sender';
import type { TelemetryEvent } from '../types';
import type { ITelemetryReceiver } from '../receiver';
import type { TaskExecutionPeriod } from '../task';
import { TASK_METRICS_CHANNEL } from '../constants';
export function createTelemetryDiagnosticsTaskConfig() {
return {
@ -27,26 +28,41 @@ export function createTelemetryDiagnosticsTaskConfig() {
sender: ITelemetryEventsSender,
taskExecutionPeriod: TaskExecutionPeriod
) => {
if (!taskExecutionPeriod.last) {
throw new Error('last execution timestamp is required');
}
const startTime = Date.now();
const taskName = 'Security Solution Telemetry Diagnostics task';
try {
if (!taskExecutionPeriod.last) {
throw new Error('last execution timestamp is required');
}
const response = await receiver.fetchDiagnosticAlerts(
taskExecutionPeriod.last,
taskExecutionPeriod.current
);
const response = await receiver.fetchDiagnosticAlerts(
taskExecutionPeriod.last,
taskExecutionPeriod.current
);
const hits = response.hits?.hits || [];
if (!Array.isArray(hits) || !hits.length) {
tlog(logger, 'no diagnostic alerts retrieved');
const hits = response.hits?.hits || [];
if (!Array.isArray(hits) || !hits.length) {
tlog(logger, 'no diagnostic alerts retrieved');
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, true, startTime),
]);
return 0;
}
tlog(logger, `Received ${hits.length} diagnostic alerts`);
const diagAlerts: TelemetryEvent[] = hits.flatMap((h) =>
h._source != null ? [h._source] : []
);
sender.queueTelemetryEvents(diagAlerts);
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, true, startTime),
]);
return diagAlerts.length;
} catch (err) {
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, false, startTime, err.message),
]);
return 0;
}
tlog(logger, `Received ${hits.length} diagnostic alerts`);
const diagAlerts: TelemetryEvent[] = hits.flatMap((h) =>
h._source != null ? [h._source] : []
);
sender.queueTelemetryEvents(diagAlerts);
return diagAlerts.length;
},
};
}

View file

@ -27,9 +27,10 @@ import {
getPreviousDailyTaskTimestamp,
isPackagePolicyList,
tlog,
createTaskMetric,
} from '../helpers';
import type { PolicyData } from '../../../../common/endpoint/types';
import { TELEMETRY_CHANNEL_ENDPOINT_META } from '../constants';
import { TELEMETRY_CHANNEL_ENDPOINT_META, TASK_METRICS_CHANNEL } from '../constants';
// Endpoint agent uses this Policy ID while it's installing.
const DefaultEndpointPolicyIdToIgnore = '00000000-0000-0000-0000-000000000000';
@ -58,294 +59,320 @@ export function createTelemetryEndpointTaskConfig(maxTelemetryBatch: number) {
sender: ITelemetryEventsSender,
taskExecutionPeriod: TaskExecutionPeriod
) => {
tlog(logger, 'test');
if (!taskExecutionPeriod.last) {
throw new Error('last execution timestamp is required');
}
const [clusterInfoPromise, licenseInfoPromise] = await Promise.allSettled([
receiver.fetchClusterInfo(),
receiver.fetchLicenseInfo(),
]);
const clusterInfo =
clusterInfoPromise.status === 'fulfilled'
? clusterInfoPromise.value
: ({} as ESClusterInfo);
const licenseInfo =
licenseInfoPromise.status === 'fulfilled'
? licenseInfoPromise.value
: ({} as ESLicense | undefined);
const endpointData = await fetchEndpointData(
receiver,
taskExecutionPeriod.last,
taskExecutionPeriod.current
);
/** STAGE 1 - Fetch Endpoint Agent Metrics
*
* Reads Endpoint Agent metrics out of the `.ds-metrics-endpoint.metrics` data stream
* and buckets them by Endpoint Agent id and sorts by the top hit. The EP agent will
* report its metrics once per day OR every time a policy change has occured. If
* a metric document(s) exists for an EP agent we map to fleet agent and policy
*/
if (endpointData.endpointMetrics === undefined) {
tlog(logger, `no endpoint metrics to report`);
return 0;
}
const { body: endpointMetricsResponse } = endpointData.endpointMetrics as unknown as {
body: EndpointMetricsAggregation;
};
if (endpointMetricsResponse.aggregations === undefined) {
tlog(logger, `no endpoint metrics to report`);
return 0;
}
const telemetryUsageCounter = sender.getTelemetryUsageCluster();
telemetryUsageCounter?.incrementCounter({
counterName: createUsageCounterLabel(
usageLabelPrefix.concat(['payloads', TELEMETRY_CHANNEL_ENDPOINT_META])
),
counterType: 'num_endpoint',
incrementBy: endpointMetricsResponse.aggregations.endpoint_count.value,
});
const endpointMetrics = endpointMetricsResponse.aggregations.endpoint_agents.buckets.map(
(epMetrics) => {
return {
endpoint_agent: epMetrics.latest_metrics.hits.hits[0]._source.agent.id,
endpoint_version: epMetrics.latest_metrics.hits.hits[0]._source.agent.version,
endpoint_metrics: epMetrics.latest_metrics.hits.hits[0]._source,
};
const startTime = Date.now();
const taskName = 'Security Solution Telemetry Endpoint Metrics and Info task';
try {
if (!taskExecutionPeriod.last) {
throw new Error('last execution timestamp is required');
}
);
const [clusterInfoPromise, licenseInfoPromise] = await Promise.allSettled([
receiver.fetchClusterInfo(),
receiver.fetchLicenseInfo(),
]);
/** STAGE 2 - Fetch Fleet Agent Config
*
* As the policy id + policy version does not exist on the Endpoint Metrics document
* we need to fetch information about the Fleet Agent and sync the metrics document
* with the Agent's policy data.
*
*/
const agentsResponse = endpointData.fleetAgentsResponse;
const clusterInfo =
clusterInfoPromise.status === 'fulfilled'
? clusterInfoPromise.value
: ({} as ESClusterInfo);
const licenseInfo =
licenseInfoPromise.status === 'fulfilled'
? licenseInfoPromise.value
: ({} as ESLicense | undefined);
if (agentsResponse === undefined) {
tlog(logger, 'no fleet agent information available');
return 0;
}
const endpointData = await fetchEndpointData(
receiver,
taskExecutionPeriod.last,
taskExecutionPeriod.current
);
/** STAGE 1 - Fetch Endpoint Agent Metrics
*
* Reads Endpoint Agent metrics out of the `.ds-metrics-endpoint.metrics` data stream
* and buckets them by Endpoint Agent id and sorts by the top hit. The EP agent will
* report its metrics once per day OR every time a policy change has occured. If
* a metric document(s) exists for an EP agent we map to fleet agent and policy
*/
if (endpointData.endpointMetrics === undefined) {
tlog(logger, `no endpoint metrics to report`);
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, true, startTime),
]);
return 0;
}
const { body: endpointMetricsResponse } = endpointData.endpointMetrics as unknown as {
body: EndpointMetricsAggregation;
};
if (endpointMetricsResponse.aggregations === undefined) {
tlog(logger, `no endpoint metrics to report`);
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, true, startTime),
]);
return 0;
}
const telemetryUsageCounter = sender.getTelemetryUsageCluster();
telemetryUsageCounter?.incrementCounter({
counterName: createUsageCounterLabel(
usageLabelPrefix.concat(['payloads', TELEMETRY_CHANNEL_ENDPOINT_META])
),
counterType: 'num_endpoint',
incrementBy: endpointMetricsResponse.aggregations.endpoint_count.value,
});
const endpointMetrics = endpointMetricsResponse.aggregations.endpoint_agents.buckets.map(
(epMetrics) => {
return {
endpoint_agent: epMetrics.latest_metrics.hits.hits[0]._source.agent.id,
endpoint_version: epMetrics.latest_metrics.hits.hits[0]._source.agent.version,
endpoint_metrics: epMetrics.latest_metrics.hits.hits[0]._source,
};
}
);
/** STAGE 2 - Fetch Fleet Agent Config
*
* As the policy id + policy version does not exist on the Endpoint Metrics document
* we need to fetch information about the Fleet Agent and sync the metrics document
* with the Agent's policy data.
*
*/
const agentsResponse = endpointData.fleetAgentsResponse;
if (agentsResponse === undefined) {
tlog(logger, 'no fleet agent information available');
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, true, startTime),
]);
return 0;
}
const fleetAgents = agentsResponse.agents.reduce((cache, agent) => {
if (agent.id === DefaultEndpointPolicyIdToIgnore) {
return cache;
}
if (agent.policy_id !== null && agent.policy_id !== undefined) {
cache.set(agent.id, agent.policy_id);
}
const fleetAgents = agentsResponse.agents.reduce((cache, agent) => {
if (agent.id === DefaultEndpointPolicyIdToIgnore) {
return cache;
}
}, new Map<string, string>());
if (agent.policy_id !== null && agent.policy_id !== undefined) {
cache.set(agent.id, agent.policy_id);
}
const endpointPolicyCache = new Map<string, PolicyData>();
for (const policyInfo of fleetAgents.values()) {
if (
policyInfo !== null &&
policyInfo !== undefined &&
!endpointPolicyCache.has(policyInfo)
) {
tlog(logger, `policy info exists as ${policyInfo}`);
const agentPolicy = await receiver.fetchPolicyConfigs(policyInfo);
const packagePolicies = agentPolicy?.package_policies;
return cache;
}, new Map<string, string>());
const endpointPolicyCache = new Map<string, PolicyData>();
for (const policyInfo of fleetAgents.values()) {
if (
policyInfo !== null &&
policyInfo !== undefined &&
!endpointPolicyCache.has(policyInfo)
) {
tlog(logger, `policy info exists as ${policyInfo}`);
const agentPolicy = await receiver.fetchPolicyConfigs(policyInfo);
const packagePolicies = agentPolicy?.package_policies;
if (packagePolicies !== undefined && isPackagePolicyList(packagePolicies)) {
tlog(logger, `package policy exists as ${JSON.stringify(packagePolicies)}`);
packagePolicies
.map((pPolicy) => pPolicy as PolicyData)
.forEach((pPolicy) => {
if (pPolicy.inputs[0]?.config !== undefined && pPolicy.inputs[0]?.config !== null) {
pPolicy.inputs.forEach((input) => {
if (
input.type === FLEET_ENDPOINT_PACKAGE &&
input?.config !== undefined &&
policyInfo !== undefined
) {
endpointPolicyCache.set(policyInfo, pPolicy);
}
});
}
});
if (packagePolicies !== undefined && isPackagePolicyList(packagePolicies)) {
tlog(logger, `package policy exists as ${JSON.stringify(packagePolicies)}`);
packagePolicies
.map((pPolicy) => pPolicy as PolicyData)
.forEach((pPolicy) => {
if (
pPolicy.inputs[0]?.config !== undefined &&
pPolicy.inputs[0]?.config !== null
) {
pPolicy.inputs.forEach((input) => {
if (
input.type === FLEET_ENDPOINT_PACKAGE &&
input?.config !== undefined &&
policyInfo !== undefined
) {
endpointPolicyCache.set(policyInfo, pPolicy);
}
});
}
});
}
}
}
}
/** STAGE 3 - Fetch Endpoint Policy Responses
*
* Reads Endpoint Agent policy responses out of the `.ds-metrics-endpoint.policy*` data
* stream and creates a local K/V structure that stores the policy response (V) with
* the Endpoint Agent Id (K). A value will only exist if there has been a endpoint
* enrolled in the last 24 hours OR a policy change has occurred. We only send
* non-successful responses. If the field is null, we assume no responses in
* the last 24h or no failures/warnings in the policy applied.
*
*/
const { body: failedPolicyResponses } = endpointData.epPolicyResponse as unknown as {
body: EndpointPolicyResponseAggregation;
};
/** STAGE 3 - Fetch Endpoint Policy Responses
*
* Reads Endpoint Agent policy responses out of the `.ds-metrics-endpoint.policy*` data
* stream and creates a local K/V structure that stores the policy response (V) with
* the Endpoint Agent Id (K). A value will only exist if there has been a endpoint
* enrolled in the last 24 hours OR a policy change has occurred. We only send
* non-successful responses. If the field is null, we assume no responses in
* the last 24h or no failures/warnings in the policy applied.
*
*/
const { body: failedPolicyResponses } = endpointData.epPolicyResponse as unknown as {
body: EndpointPolicyResponseAggregation;
};
// If there is no policy responses in the 24h > now then we will continue
const policyResponses = failedPolicyResponses.aggregations
? failedPolicyResponses.aggregations.policy_responses.buckets.reduce(
// If there is no policy responses in the 24h > now then we will continue
const policyResponses = failedPolicyResponses.aggregations
? failedPolicyResponses.aggregations.policy_responses.buckets.reduce(
(cache, endpointAgentId) => {
const doc = endpointAgentId.latest_response.hits.hits[0];
cache.set(endpointAgentId.key, doc);
return cache;
},
new Map<string, EndpointPolicyResponseDocument>()
)
: new Map<string, EndpointPolicyResponseDocument>();
tlog(
logger,
`policy responses exists as ${JSON.stringify(Object.fromEntries(policyResponses))}`
);
/** STAGE 4 - Fetch Endpoint Agent Metadata
*
* Reads Endpoint Agent metadata out of the `.ds-metrics-endpoint.metadata` data stream
* and buckets them by Endpoint Agent id and sorts by the top hit. The EP agent will
* report its metadata once per day OR every time a policy change has occured. If
* a metadata document(s) exists for an EP agent we map to fleet agent and policy
*/
if (endpointData.endpointMetadata === undefined) {
tlog(logger, `no endpoint metadata to report`);
}
const { body: endpointMetadataResponse } = endpointData.endpointMetadata as unknown as {
body: EndpointMetadataAggregation;
};
if (endpointMetadataResponse.aggregations === undefined) {
tlog(logger, `no endpoint metadata to report`);
}
const endpointMetadata =
endpointMetadataResponse.aggregations.endpoint_metadata.buckets.reduce(
(cache, endpointAgentId) => {
const doc = endpointAgentId.latest_response.hits.hits[0];
const doc = endpointAgentId.latest_metadata.hits.hits[0];
cache.set(endpointAgentId.key, doc);
return cache;
},
new Map<string, EndpointPolicyResponseDocument>()
)
: new Map<string, EndpointPolicyResponseDocument>();
tlog(
logger,
`policy responses exists as ${JSON.stringify(Object.fromEntries(policyResponses))}`
);
/** STAGE 4 - Fetch Endpoint Agent Metadata
*
* Reads Endpoint Agent metadata out of the `.ds-metrics-endpoint.metadata` data stream
* and buckets them by Endpoint Agent id and sorts by the top hit. The EP agent will
* report its metadata once per day OR every time a policy change has occured. If
* a metadata document(s) exists for an EP agent we map to fleet agent and policy
*/
if (endpointData.endpointMetadata === undefined) {
tlog(logger, `no endpoint metadata to report`);
}
const { body: endpointMetadataResponse } = endpointData.endpointMetadata as unknown as {
body: EndpointMetadataAggregation;
};
if (endpointMetadataResponse.aggregations === undefined) {
tlog(logger, `no endpoint metadata to report`);
}
const endpointMetadata =
endpointMetadataResponse.aggregations.endpoint_metadata.buckets.reduce(
(cache, endpointAgentId) => {
const doc = endpointAgentId.latest_metadata.hits.hits[0];
cache.set(endpointAgentId.key, doc);
return cache;
},
new Map<string, EndpointMetadataDocument>()
new Map<string, EndpointMetadataDocument>()
);
tlog(
logger,
`endpoint metadata exists as ${JSON.stringify(Object.fromEntries(endpointMetadata))}`
);
tlog(
logger,
`endpoint metadata exists as ${JSON.stringify(Object.fromEntries(endpointMetadata))}`
);
/** STAGE 5 - Create the telemetry log records
*
* Iterates through the endpoint metrics documents at STAGE 1 and joins them together
* to form the telemetry log that is sent back to Elastic Security developers to
* make improvements to the product.
*
*/
try {
const telemetryPayloads = endpointMetrics.map((endpoint) => {
let policyConfig = null;
let failedPolicy = null;
let endpointMetadataById = null;
const fleetAgentId = endpoint.endpoint_metrics.elastic.agent.id;
const endpointAgentId = endpoint.endpoint_agent;
const policyInformation = fleetAgents.get(fleetAgentId);
if (policyInformation) {
policyConfig = endpointPolicyCache.get(policyInformation) || null;
if (policyConfig) {
failedPolicy = policyResponses.get(endpointAgentId);
}
}
if (endpointMetadata) {
endpointMetadataById = endpointMetadata.get(endpointAgentId);
}
const {
cpu,
memory,
uptime,
documents_volume: documentsVolume,
malicious_behavior_rules: maliciousBehaviorRules,
system_impact: systemImpact,
threads,
event_filter: eventFilter,
} = endpoint.endpoint_metrics.Endpoint.metrics;
const endpointPolicyDetail = extractEndpointPolicyConfig(policyConfig);
if (endpointPolicyDetail) {
endpointPolicyDetail.value = addDefaultAdvancedPolicyConfigSettings(
endpointPolicyDetail.value
);
}
return {
'@timestamp': taskExecutionPeriod.current,
cluster_uuid: clusterInfo.cluster_uuid,
cluster_name: clusterInfo.cluster_name,
license_id: licenseInfo?.uid,
endpoint_id: endpointAgentId,
endpoint_version: endpoint.endpoint_version,
endpoint_package_version: policyConfig?.package?.version || null,
endpoint_metrics: {
cpu: cpu.endpoint,
memory: memory.endpoint.private,
uptime,
documentsVolume,
maliciousBehaviorRules,
systemImpact,
threads,
eventFilter,
},
endpoint_meta: {
os: endpoint.endpoint_metrics.host.os,
capabilities:
endpointMetadataById !== null && endpointMetadataById !== undefined
? endpointMetadataById._source.Endpoint.capabilities
: [],
},
policy_config: endpointPolicyDetail !== null ? endpointPolicyDetail : {},
policy_response:
failedPolicy !== null && failedPolicy !== undefined
? {
agent_policy_status: failedPolicy._source.event.agent_id_status,
manifest_version:
failedPolicy._source.Endpoint.policy.applied.artifacts.global.version,
status: failedPolicy._source.Endpoint.policy.applied.status,
actions: failedPolicy._source.Endpoint.policy.applied.actions
.map((action) => (action.status !== 'success' ? action : null))
.filter((action) => action !== null),
configuration: failedPolicy._source.Endpoint.configuration,
state: failedPolicy._source.Endpoint.state,
}
: {},
telemetry_meta: {
metrics_timestamp: endpoint.endpoint_metrics['@timestamp'],
},
};
});
/**
* STAGE 6 - Send the documents
/** STAGE 5 - Create the telemetry log records
*
* Iterates through the endpoint metrics documents at STAGE 1 and joins them together
* to form the telemetry log that is sent back to Elastic Security developers to
* make improvements to the product.
*
* Send the documents in a batches of maxTelemetryBatch
*/
const batches = batchTelemetryRecords(telemetryPayloads, maxTelemetryBatch);
for (const batch of batches) {
await sender.sendOnDemand(TELEMETRY_CHANNEL_ENDPOINT_META, batch);
try {
const telemetryPayloads = endpointMetrics.map((endpoint) => {
let policyConfig = null;
let failedPolicy = null;
let endpointMetadataById = null;
const fleetAgentId = endpoint.endpoint_metrics.elastic.agent.id;
const endpointAgentId = endpoint.endpoint_agent;
const policyInformation = fleetAgents.get(fleetAgentId);
if (policyInformation) {
policyConfig = endpointPolicyCache.get(policyInformation) || null;
if (policyConfig) {
failedPolicy = policyResponses.get(endpointAgentId);
}
}
if (endpointMetadata) {
endpointMetadataById = endpointMetadata.get(endpointAgentId);
}
const {
cpu,
memory,
uptime,
documents_volume: documentsVolume,
malicious_behavior_rules: maliciousBehaviorRules,
system_impact: systemImpact,
threads,
event_filter: eventFilter,
} = endpoint.endpoint_metrics.Endpoint.metrics;
const endpointPolicyDetail = extractEndpointPolicyConfig(policyConfig);
if (endpointPolicyDetail) {
endpointPolicyDetail.value = addDefaultAdvancedPolicyConfigSettings(
endpointPolicyDetail.value
);
}
return {
'@timestamp': taskExecutionPeriod.current,
cluster_uuid: clusterInfo.cluster_uuid,
cluster_name: clusterInfo.cluster_name,
license_id: licenseInfo?.uid,
endpoint_id: endpointAgentId,
endpoint_version: endpoint.endpoint_version,
endpoint_package_version: policyConfig?.package?.version || null,
endpoint_metrics: {
cpu: cpu.endpoint,
memory: memory.endpoint.private,
uptime,
documentsVolume,
maliciousBehaviorRules,
systemImpact,
threads,
eventFilter,
},
endpoint_meta: {
os: endpoint.endpoint_metrics.host.os,
capabilities:
endpointMetadataById !== null && endpointMetadataById !== undefined
? endpointMetadataById._source.Endpoint.capabilities
: [],
},
policy_config: endpointPolicyDetail !== null ? endpointPolicyDetail : {},
policy_response:
failedPolicy !== null && failedPolicy !== undefined
? {
agent_policy_status: failedPolicy._source.event.agent_id_status,
manifest_version:
failedPolicy._source.Endpoint.policy.applied.artifacts.global.version,
status: failedPolicy._source.Endpoint.policy.applied.status,
actions: failedPolicy._source.Endpoint.policy.applied.actions
.map((action) => (action.status !== 'success' ? action : null))
.filter((action) => action !== null),
configuration: failedPolicy._source.Endpoint.configuration,
state: failedPolicy._source.Endpoint.state,
}
: {},
telemetry_meta: {
metrics_timestamp: endpoint.endpoint_metrics['@timestamp'],
},
};
});
/**
* STAGE 6 - Send the documents
*
* Send the documents in a batches of maxTelemetryBatch
*/
const batches = batchTelemetryRecords(telemetryPayloads, maxTelemetryBatch);
for (const batch of batches) {
await sender.sendOnDemand(TELEMETRY_CHANNEL_ENDPOINT_META, batch);
}
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, true, startTime),
]);
return telemetryPayloads.length;
} catch (err) {
logger.warn(`could not complete endpoint alert telemetry task due to ${err?.message}`);
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, false, startTime, err.message),
]);
return 0;
}
return telemetryPayloads.length;
} catch (err) {
logger.warn(`could not complete endpoint alert telemetry task due to ${err?.message}`);
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, false, startTime, err.message),
]);
return 0;
}
},

View file

@ -10,8 +10,8 @@ import type { ITelemetryEventsSender } from '../sender';
import type { ITelemetryReceiver } from '../receiver';
import type { ESClusterInfo, ESLicense, TelemetryEvent } from '../types';
import type { TaskExecutionPeriod } from '../task';
import { TELEMETRY_CHANNEL_DETECTION_ALERTS } from '../constants';
import { batchTelemetryRecords, tlog } from '../helpers';
import { TELEMETRY_CHANNEL_DETECTION_ALERTS, TASK_METRICS_CHANNEL } from '../constants';
import { batchTelemetryRecords, tlog, createTaskMetric } from '../helpers';
import { copyAllowlistedFields, prebuiltRuleAllowlistFields } from '../filterlists';
export function createTelemetryPrebuiltRuleAlertsTaskConfig(maxTelemetryBatch: number) {
@ -28,6 +28,8 @@ export function createTelemetryPrebuiltRuleAlertsTaskConfig(maxTelemetryBatch: n
sender: ITelemetryEventsSender,
taskExecutionPeriod: TaskExecutionPeriod
) => {
const startTime = Date.now();
const taskName = 'Security Solution - Prebuilt Rule and Elastic ML Alerts Telemetry';
try {
const [clusterInfoPromise, licenseInfoPromise] = await Promise.allSettled([
receiver.fetchClusterInfo(),
@ -54,6 +56,9 @@ export function createTelemetryPrebuiltRuleAlertsTaskConfig(maxTelemetryBatch: n
if (telemetryEvents.length === 0) {
tlog(logger, 'no prebuilt rule alerts retrieved');
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, true, startTime),
]);
return 0;
}
@ -76,10 +81,15 @@ export function createTelemetryPrebuiltRuleAlertsTaskConfig(maxTelemetryBatch: n
for (const batch of batches) {
await sender.sendOnDemand(TELEMETRY_CHANNEL_DETECTION_ALERTS, batch);
}
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, true, startTime),
]);
return enrichedAlerts.length;
} catch (err) {
logger.error('could not complete prebuilt alerts telemetry task');
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, false, startTime, err.message),
]);
return 0;
}
},

View file

@ -15,9 +15,10 @@ import {
LIST_ENDPOINT_EVENT_FILTER,
LIST_TRUSTED_APPLICATION,
TELEMETRY_CHANNEL_LISTS,
TASK_METRICS_CHANNEL,
} from '../constants';
import type { ESClusterInfo, ESLicense } from '../types';
import { batchTelemetryRecords, templateExceptionList, tlog } from '../helpers';
import { batchTelemetryRecords, templateExceptionList, tlog, createTaskMetric } from '../helpers';
import type { ITelemetryEventsSender } from '../sender';
import type { ITelemetryReceiver } from '../receiver';
import type { TaskExecutionPeriod } from '../task';
@ -36,88 +37,100 @@ export function createTelemetrySecurityListTaskConfig(maxTelemetryBatch: number)
sender: ITelemetryEventsSender,
taskExecutionPeriod: TaskExecutionPeriod
) => {
let count = 0;
const startTime = Date.now();
const taskName = 'Security Solution Lists Telemetry';
try {
let count = 0;
const [clusterInfoPromise, licenseInfoPromise] = await Promise.allSettled([
receiver.fetchClusterInfo(),
receiver.fetchLicenseInfo(),
]);
const [clusterInfoPromise, licenseInfoPromise] = await Promise.allSettled([
receiver.fetchClusterInfo(),
receiver.fetchLicenseInfo(),
]);
const clusterInfo =
clusterInfoPromise.status === 'fulfilled'
? clusterInfoPromise.value
: ({} as ESClusterInfo);
const licenseInfo =
licenseInfoPromise.status === 'fulfilled'
? licenseInfoPromise.value
: ({} as ESLicense | undefined);
const FETCH_VALUE_LIST_META_DATA_INTERVAL_IN_HOURS = 24;
const clusterInfo =
clusterInfoPromise.status === 'fulfilled'
? clusterInfoPromise.value
: ({} as ESClusterInfo);
const licenseInfo =
licenseInfoPromise.status === 'fulfilled'
? licenseInfoPromise.value
: ({} as ESLicense | undefined);
const FETCH_VALUE_LIST_META_DATA_INTERVAL_IN_HOURS = 24;
// Lists Telemetry: Trusted Applications
const trustedApps = await receiver.fetchTrustedApplications();
if (trustedApps?.data) {
const trustedAppsJson = templateExceptionList(
trustedApps.data,
clusterInfo,
licenseInfo,
LIST_TRUSTED_APPLICATION
);
tlog(logger, `Trusted Apps: ${trustedAppsJson}`);
count += trustedAppsJson.length;
// Lists Telemetry: Trusted Applications
const trustedApps = await receiver.fetchTrustedApplications();
if (trustedApps?.data) {
const trustedAppsJson = templateExceptionList(
trustedApps.data,
clusterInfo,
licenseInfo,
LIST_TRUSTED_APPLICATION
);
tlog(logger, `Trusted Apps: ${trustedAppsJson}`);
count += trustedAppsJson.length;
const batches = batchTelemetryRecords(trustedAppsJson, maxTelemetryBatch);
for (const batch of batches) {
await sender.sendOnDemand(TELEMETRY_CHANNEL_LISTS, batch);
const batches = batchTelemetryRecords(trustedAppsJson, maxTelemetryBatch);
for (const batch of batches) {
await sender.sendOnDemand(TELEMETRY_CHANNEL_LISTS, batch);
}
}
}
// Lists Telemetry: Endpoint Exceptions
// Lists Telemetry: Endpoint Exceptions
const epExceptions = await receiver.fetchEndpointList(ENDPOINT_LIST_ID);
if (epExceptions?.data) {
const epExceptionsJson = templateExceptionList(
epExceptions.data,
clusterInfo,
licenseInfo,
LIST_ENDPOINT_EXCEPTION
);
tlog(logger, `EP Exceptions: ${epExceptionsJson}`);
count += epExceptionsJson.length;
const epExceptions = await receiver.fetchEndpointList(ENDPOINT_LIST_ID);
if (epExceptions?.data) {
const epExceptionsJson = templateExceptionList(
epExceptions.data,
clusterInfo,
licenseInfo,
LIST_ENDPOINT_EXCEPTION
);
tlog(logger, `EP Exceptions: ${epExceptionsJson}`);
count += epExceptionsJson.length;
const batches = batchTelemetryRecords(epExceptionsJson, maxTelemetryBatch);
for (const batch of batches) {
await sender.sendOnDemand(TELEMETRY_CHANNEL_LISTS, batch);
const batches = batchTelemetryRecords(epExceptionsJson, maxTelemetryBatch);
for (const batch of batches) {
await sender.sendOnDemand(TELEMETRY_CHANNEL_LISTS, batch);
}
}
}
// Lists Telemetry: Endpoint Event Filters
// Lists Telemetry: Endpoint Event Filters
const epFilters = await receiver.fetchEndpointList(ENDPOINT_EVENT_FILTERS_LIST_ID);
if (epFilters?.data) {
const epFiltersJson = templateExceptionList(
epFilters.data,
clusterInfo,
licenseInfo,
LIST_ENDPOINT_EVENT_FILTER
);
tlog(logger, `EP Event Filters: ${epFiltersJson}`);
count += epFiltersJson.length;
const epFilters = await receiver.fetchEndpointList(ENDPOINT_EVENT_FILTERS_LIST_ID);
if (epFilters?.data) {
const epFiltersJson = templateExceptionList(
epFilters.data,
clusterInfo,
licenseInfo,
LIST_ENDPOINT_EVENT_FILTER
);
tlog(logger, `EP Event Filters: ${epFiltersJson}`);
count += epFiltersJson.length;
const batches = batchTelemetryRecords(epFiltersJson, maxTelemetryBatch);
for (const batch of batches) {
await sender.sendOnDemand(TELEMETRY_CHANNEL_LISTS, batch);
const batches = batchTelemetryRecords(epFiltersJson, maxTelemetryBatch);
for (const batch of batches) {
await sender.sendOnDemand(TELEMETRY_CHANNEL_LISTS, batch);
}
}
}
// Value list meta data
const valueListMetaData = await receiver.fetchValueListMetaData(
FETCH_VALUE_LIST_META_DATA_INTERVAL_IN_HOURS
);
tlog(logger, `Value List Meta Data: ${JSON.stringify(valueListMetaData)}`);
if (valueListMetaData?.total_list_count) {
await sender.sendOnDemand(TELEMETRY_CHANNEL_LISTS, [valueListMetaData]);
// Value list meta data
const valueListMetaData = await receiver.fetchValueListMetaData(
FETCH_VALUE_LIST_META_DATA_INTERVAL_IN_HOURS
);
tlog(logger, `Value List Meta Data: ${JSON.stringify(valueListMetaData)}`);
if (valueListMetaData?.total_list_count) {
await sender.sendOnDemand(TELEMETRY_CHANNEL_LISTS, [valueListMetaData]);
}
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, true, startTime),
]);
return count;
} catch (err) {
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, false, startTime, err.message),
]);
return 0;
}
return count;
},
};
}

View file

@ -60,6 +60,5 @@ describe('timeline telemetry task test', () => {
expect(mockTelemetryReceiver.buildProcessTree).toHaveBeenCalled();
expect(mockTelemetryReceiver.fetchTimelineEvents).toHaveBeenCalled();
expect(mockTelemetryReceiver.fetchTimelineEndpointAlerts).toHaveBeenCalled();
expect(mockTelemetryEventsSender.sendOnDemand).not.toHaveBeenCalled();
});
});

View file

@ -17,9 +17,9 @@ import type {
TimelineTelemetryTemplate,
TimelineTelemetryEvent,
} from '../types';
import { TELEMETRY_CHANNEL_TIMELINE } from '../constants';
import { TELEMETRY_CHANNEL_TIMELINE, TASK_METRICS_CHANNEL } from '../constants';
import { resolverEntity } from '../../../endpoint/routes/resolver/entity/utils/build_resolver_entity';
import { tlog } from '../helpers';
import { tlog, createTaskMetric } from '../helpers';
export function createTelemetryTimelineTaskConfig() {
return {
@ -35,145 +35,159 @@ export function createTelemetryTimelineTaskConfig() {
sender: ITelemetryEventsSender,
taskExecutionPeriod: TaskExecutionPeriod
) => {
let counter = 0;
const startTime = Date.now();
const taskName = 'Security Solution Timeline telemetry';
try {
let counter = 0;
tlog(logger, `Running task: ${taskId}`);
tlog(logger, `Running task: ${taskId}`);
const [clusterInfoPromise, licenseInfoPromise] = await Promise.allSettled([
receiver.fetchClusterInfo(),
receiver.fetchLicenseInfo(),
]);
const [clusterInfoPromise, licenseInfoPromise] = await Promise.allSettled([
receiver.fetchClusterInfo(),
receiver.fetchLicenseInfo(),
]);
const clusterInfo =
clusterInfoPromise.status === 'fulfilled'
? clusterInfoPromise.value
: ({} as ESClusterInfo);
const clusterInfo =
clusterInfoPromise.status === 'fulfilled'
? clusterInfoPromise.value
: ({} as ESClusterInfo);
const licenseInfo =
licenseInfoPromise.status === 'fulfilled'
? licenseInfoPromise.value
: ({} as ESLicense | undefined);
const licenseInfo =
licenseInfoPromise.status === 'fulfilled'
? licenseInfoPromise.value
: ({} as ESLicense | undefined);
const now = moment();
const startOfDay = now.startOf('day').toISOString();
const endOfDay = now.endOf('day').toISOString();
const now = moment();
const startOfDay = now.startOf('day').toISOString();
const endOfDay = now.endOf('day').toISOString();
const baseDocument = {
version: clusterInfo.version?.number,
cluster_name: clusterInfo.cluster_name,
cluster_uuid: clusterInfo.cluster_uuid,
license_uuid: licenseInfo?.uid,
};
const baseDocument = {
version: clusterInfo.version?.number,
cluster_name: clusterInfo.cluster_name,
cluster_uuid: clusterInfo.cluster_uuid,
license_uuid: licenseInfo?.uid,
};
// Fetch EP Alerts
// Fetch EP Alerts
const endpointAlerts = await receiver.fetchTimelineEndpointAlerts(3);
const aggregations = endpointAlerts?.aggregations as unknown as {
endpoint_alert_count: { value: number };
};
tlog(logger, `Endpoint alert count: ${aggregations?.endpoint_alert_count}`);
sender.getTelemetryUsageCluster()?.incrementCounter({
counterName: 'telemetry_endpoint_alert',
counterType: 'endpoint_alert_count',
incrementBy: aggregations?.endpoint_alert_count.value,
});
// No EP Alerts -> Nothing to do
if (
endpointAlerts.hits.hits?.length === 0 ||
endpointAlerts.hits.hits?.length === undefined
) {
tlog(logger, 'no endpoint alerts received. exiting telemetry task.');
return counter;
}
// Build process tree for each EP Alert recieved
for (const alert of endpointAlerts.hits.hits) {
const eventId = alert._source ? alert._source['event.id'] : 'unknown';
const alertUUID = alert._source ? alert._source['kibana.alert.uuid'] : 'unknown';
const entities = resolverEntity([alert]);
// Build Tree
const tree = await receiver.buildProcessTree(
entities[0].id,
entities[0].schema,
startOfDay,
endOfDay
);
const nodeIds = [] as string[];
if (Array.isArray(tree)) {
for (const node of tree) {
const nodeId = node?.id.toString();
nodeIds.push(nodeId);
}
}
const endpointAlerts = await receiver.fetchTimelineEndpointAlerts(3);
const aggregations = endpointAlerts?.aggregations as unknown as {
endpoint_alert_count: { value: number };
};
tlog(logger, `Endpoint alert count: ${aggregations?.endpoint_alert_count}`);
sender.getTelemetryUsageCluster()?.incrementCounter({
counterName: 'telemetry_timeline',
counterType: 'timeline_node_count',
incrementBy: nodeIds.length,
counterName: 'telemetry_endpoint_alert',
counterType: 'endpoint_alert_count',
incrementBy: aggregations?.endpoint_alert_count.value,
});
// Fetch event lineage
const timelineEvents = await receiver.fetchTimelineEvents(nodeIds);
tlog(logger, `Timeline Events: ${JSON.stringify(timelineEvents)}`);
const eventsStore = new Map<string, SafeEndpointEvent>();
for (const event of timelineEvents.hits.hits) {
const doc = event._source;
if (doc !== null && doc !== undefined) {
const entityId = doc?.process?.entity_id?.toString();
if (entityId !== null && entityId !== undefined) eventsStore.set(entityId, doc);
}
// No EP Alerts -> Nothing to do
if (
endpointAlerts.hits.hits?.length === 0 ||
endpointAlerts.hits.hits?.length === undefined
) {
tlog(logger, 'no endpoint alerts received. exiting telemetry task.');
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, true, startTime),
]);
return counter;
}
sender.getTelemetryUsageCluster()?.incrementCounter({
counterName: 'telemetry_timeline',
counterType: 'timeline_event_count',
incrementBy: eventsStore.size,
});
// Build process tree for each EP Alert recieved
// Create telemetry record
for (const alert of endpointAlerts.hits.hits) {
const eventId = alert._source ? alert._source['event.id'] : 'unknown';
const alertUUID = alert._source ? alert._source['kibana.alert.uuid'] : 'unknown';
const telemetryTimeline: TimelineTelemetryEvent[] = [];
if (Array.isArray(tree)) {
for (const node of tree) {
const id = node.id.toString();
const event = eventsStore.get(id);
const entities = resolverEntity([alert]);
const timelineTelemetryEvent: TimelineTelemetryEvent = {
...node,
event,
// Build Tree
const tree = await receiver.buildProcessTree(
entities[0].id,
entities[0].schema,
startOfDay,
endOfDay
);
const nodeIds = [] as string[];
if (Array.isArray(tree)) {
for (const node of tree) {
const nodeId = node?.id.toString();
nodeIds.push(nodeId);
}
}
sender.getTelemetryUsageCluster()?.incrementCounter({
counterName: 'telemetry_timeline',
counterType: 'timeline_node_count',
incrementBy: nodeIds.length,
});
// Fetch event lineage
const timelineEvents = await receiver.fetchTimelineEvents(nodeIds);
tlog(logger, `Timeline Events: ${JSON.stringify(timelineEvents)}`);
const eventsStore = new Map<string, SafeEndpointEvent>();
for (const event of timelineEvents.hits.hits) {
const doc = event._source;
if (doc !== null && doc !== undefined) {
const entityId = doc?.process?.entity_id?.toString();
if (entityId !== null && entityId !== undefined) eventsStore.set(entityId, doc);
}
}
sender.getTelemetryUsageCluster()?.incrementCounter({
counterName: 'telemetry_timeline',
counterType: 'timeline_event_count',
incrementBy: eventsStore.size,
});
// Create telemetry record
const telemetryTimeline: TimelineTelemetryEvent[] = [];
if (Array.isArray(tree)) {
for (const node of tree) {
const id = node.id.toString();
const event = eventsStore.get(id);
const timelineTelemetryEvent: TimelineTelemetryEvent = {
...node,
event,
};
telemetryTimeline.push(timelineTelemetryEvent);
}
}
if (telemetryTimeline.length >= 1) {
const record: TimelineTelemetryTemplate = {
'@timestamp': moment().toISOString(),
...baseDocument,
alert_id: alertUUID,
event_id: eventId,
timeline: telemetryTimeline,
};
telemetryTimeline.push(timelineTelemetryEvent);
sender.sendOnDemand(TELEMETRY_CHANNEL_TIMELINE, [record]);
counter += 1;
} else {
tlog(logger, 'no events in timeline');
}
}
if (telemetryTimeline.length >= 1) {
const record: TimelineTelemetryTemplate = {
'@timestamp': moment().toISOString(),
...baseDocument,
alert_id: alertUUID,
event_id: eventId,
timeline: telemetryTimeline,
};
sender.sendOnDemand(TELEMETRY_CHANNEL_TIMELINE, [record]);
counter += 1;
} else {
tlog(logger, 'no events in timeline');
}
tlog(logger, `sent ${counter} timelines. concluding timeline task.`);
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, true, startTime),
]);
return counter;
} catch (err) {
await sender.sendOnDemand(TASK_METRICS_CHANNEL, [
createTaskMetric(taskName, false, startTime, err.message),
]);
return 0;
}
tlog(logger, `sent ${counter} timelines. concluding timeline task.`);
return counter;
},
};
}

View file

@ -12,6 +12,7 @@ import {
deleteAllAlerts,
deleteSignalsIndex,
getSecurityTelemetryStats,
removeTimeFieldsFromTelemetryStats,
} from '../../../../utils';
import { deleteAllExceptions } from '../../../../../lists_api_integration/utils';
@ -41,14 +42,43 @@ export default ({ getService }: FtrProviderContext) => {
await deleteAllExceptions(supertest, log);
});
it('should have initialized empty/zero values when no rules are running', async () => {
it('should only have task metric values when no rules are running', async () => {
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
removeTimeFieldsFromTelemetryStats(stats);
expect(stats).to.eql({
detection_rules: [],
security_lists: [],
endpoints: [],
diagnostics: [],
detection_rules: [
[
{
name: 'Security Solution Detection Rule Lists Telemetry',
passed: true,
},
],
],
security_lists: [
[
{
name: 'Security Solution Lists Telemetry',
passed: true,
},
],
],
endpoints: [
[
{
name: 'Security Solution Telemetry Endpoint Metrics and Info task',
passed: true,
},
],
],
diagnostics: [
[
{
name: 'Security Solution Telemetry Diagnostics task',
passed: true,
},
],
],
});
});
});

View file

@ -21,6 +21,7 @@ import {
getSecurityTelemetryStats,
createExceptionList,
createExceptionListItem,
removeTimeFieldsFromTelemetryStats,
} from '../../../../utils';
import { deleteAllExceptions } from '../../../../../lists_api_integration/utils';
@ -100,7 +101,15 @@ export default ({ getService }: FtrProviderContext) => {
// Get the stats and ensure they're empty
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
expect(stats.detection_rules).to.eql([]);
removeTimeFieldsFromTelemetryStats(stats);
expect(stats.detection_rules).to.eql([
[
{
name: 'Security Solution Detection Rule Lists Telemetry',
passed: true,
},
],
]);
});
});
@ -148,7 +157,15 @@ export default ({ getService }: FtrProviderContext) => {
// Get the stats and ensure they're empty
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
expect(stats.detection_rules).to.eql([]);
removeTimeFieldsFromTelemetryStats(stats);
expect(stats.detection_rules).to.eql([
[
{
name: 'Security Solution Detection Rule Lists Telemetry',
passed: true,
},
],
]);
});
});
@ -196,7 +213,15 @@ export default ({ getService }: FtrProviderContext) => {
// Get the stats and ensure they're empty
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
expect(stats.detection_rules).to.eql([]);
removeTimeFieldsFromTelemetryStats(stats);
expect(stats.detection_rules).to.eql([
[
{
name: 'Security Solution Detection Rule Lists Telemetry',
passed: true,
},
],
]);
});
});
@ -244,7 +269,15 @@ export default ({ getService }: FtrProviderContext) => {
// Get the stats and ensure they're empty
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
expect(stats.detection_rules).to.eql([]);
removeTimeFieldsFromTelemetryStats(stats);
expect(stats.detection_rules).to.eql([
[
{
name: 'Security Solution Detection Rule Lists Telemetry',
passed: true,
},
],
]);
});
});
@ -292,7 +325,15 @@ export default ({ getService }: FtrProviderContext) => {
// Get the stats and ensure they're empty
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
expect(stats.detection_rules).to.eql([]);
removeTimeFieldsFromTelemetryStats(stats);
expect(stats.detection_rules).to.eql([
[
{
name: 'Security Solution Detection Rule Lists Telemetry',
passed: true,
},
],
]);
});
});
});
@ -350,7 +391,7 @@ export default ({ getService }: FtrProviderContext) => {
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
expect(stats.detection_rules).length(1);
expect(stats.detection_rules).length(2);
const detectionRule = stats.detection_rules[0][0];
expect(detectionRule['@timestamp']).to.be.a('string');
expect(detectionRule.cluster_uuid).to.be.a('string');
@ -408,9 +449,10 @@ export default ({ getService }: FtrProviderContext) => {
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
removeTimeFieldsFromTelemetryStats(stats);
const detectionRules = stats.detection_rules
.flat()
.map((obj: { detection_rule: any }) => obj.detection_rule);
.map((obj: any) => (obj.passed != null ? obj : obj.detection_rule));
expect(detectionRules).to.eql([
{
@ -428,6 +470,10 @@ export default ({ getService }: FtrProviderContext) => {
os_types: [],
rule_version: detectionRules[0].rule_version,
},
{
name: 'Security Solution Detection Rule Lists Telemetry',
passed: true,
},
]);
});
});
@ -479,9 +525,10 @@ export default ({ getService }: FtrProviderContext) => {
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
removeTimeFieldsFromTelemetryStats(stats);
const detectionRules = stats.detection_rules
.flat()
.map((obj: { detection_rule: any }) => obj.detection_rule);
.map((obj: any) => (obj.passed != null ? obj : obj.detection_rule));
expect(detectionRules).to.eql([
{
@ -499,6 +546,10 @@ export default ({ getService }: FtrProviderContext) => {
os_types: [],
rule_version: detectionRules[0].rule_version,
},
{
name: 'Security Solution Detection Rule Lists Telemetry',
passed: true,
},
]);
});
});
@ -550,9 +601,10 @@ export default ({ getService }: FtrProviderContext) => {
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
removeTimeFieldsFromTelemetryStats(stats);
const detectionRules = stats.detection_rules
.flat()
.map((obj: { detection_rule: any }) => obj.detection_rule);
.map((obj: any) => (obj.passed != null ? obj : obj.detection_rule));
expect(detectionRules).to.eql([
{
@ -570,6 +622,10 @@ export default ({ getService }: FtrProviderContext) => {
os_types: [],
rule_version: detectionRules[0].rule_version,
},
{
name: 'Security Solution Detection Rule Lists Telemetry',
passed: true,
},
]);
});
});
@ -621,9 +677,10 @@ export default ({ getService }: FtrProviderContext) => {
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
removeTimeFieldsFromTelemetryStats(stats);
const detectionRules = stats.detection_rules
.flat()
.map((obj: { detection_rule: any }) => obj.detection_rule);
.map((obj: any) => (obj.passed != null ? obj : obj.detection_rule));
expect(detectionRules).to.eql([
{
@ -641,6 +698,10 @@ export default ({ getService }: FtrProviderContext) => {
os_types: [],
rule_version: detectionRules[0].rule_version,
},
{
name: 'Security Solution Detection Rule Lists Telemetry',
passed: true,
},
]);
});
});
@ -692,9 +753,10 @@ export default ({ getService }: FtrProviderContext) => {
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
removeTimeFieldsFromTelemetryStats(stats);
const detectionRules = stats.detection_rules
.flat()
.map((obj: { detection_rule: any }) => obj.detection_rule);
.map((obj: any) => (obj.passed != null ? obj : obj.detection_rule));
expect(detectionRules).to.eql([
{
@ -712,6 +774,10 @@ export default ({ getService }: FtrProviderContext) => {
os_types: [],
rule_version: detectionRules[0].rule_version,
},
{
name: 'Security Solution Detection Rule Lists Telemetry',
passed: true,
},
]);
});
});
@ -787,11 +853,12 @@ export default ({ getService }: FtrProviderContext) => {
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
removeTimeFieldsFromTelemetryStats(stats);
const detectionRules = stats.detection_rules
.flat()
.map((obj: { detection_rule: any }) => obj.detection_rule)
.map((obj: any) => (obj.passed != null ? obj : obj.detection_rule))
.sort((obj1: { entries: { name: number } }, obj2: { entries: { name: number } }) => {
return obj1.entries.name - obj2.entries.name;
return obj1?.entries?.name - obj2?.entries?.name;
});
expect(detectionRules).to.eql([
@ -825,6 +892,10 @@ export default ({ getService }: FtrProviderContext) => {
os_types: [],
rule_version: detectionRules[1].rule_version,
},
{
name: 'Security Solution Detection Rule Lists Telemetry',
passed: true,
},
]);
});
});

View file

@ -19,6 +19,7 @@ import {
getSecurityTelemetryStats,
createExceptionListItem,
createExceptionList,
removeTimeFieldsFromTelemetryStats,
} from '../../../../utils';
import { deleteAllExceptions } from '../../../../../lists_api_integration/utils';
@ -72,10 +73,10 @@ export default ({ getService }: FtrProviderContext) => {
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
removeTimeFieldsFromTelemetryStats(stats);
const trustedApplication = stats.security_lists
.flat()
.map((obj: { trusted_application: any }) => obj.trusted_application);
.map((obj: any) => (obj.passed != null ? obj : obj.trusted_application));
expect(trustedApplication).to.eql([
{
created_at: trustedApplication[0].created_at,
@ -95,6 +96,10 @@ export default ({ getService }: FtrProviderContext) => {
policies: [],
},
},
{
name: 'Security Solution Lists Telemetry',
passed: true,
},
]);
});
});
@ -138,12 +143,12 @@ export default ({ getService }: FtrProviderContext) => {
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
removeTimeFieldsFromTelemetryStats(stats);
const trustedApplication = stats.security_lists
.flat()
.map((obj: { trusted_application: any }) => obj.trusted_application)
.map((obj: any) => (obj.passed != null ? obj : obj.trusted_application))
.sort((obj1: { entries: { name: number } }, obj2: { entries: { name: number } }) => {
return obj1.entries.name - obj2.entries.name;
return obj1?.entries?.name - obj2?.entries?.name;
});
expect(trustedApplication).to.eql([
@ -183,6 +188,10 @@ export default ({ getService }: FtrProviderContext) => {
policies: [],
},
},
{
name: 'Security Solution Lists Telemetry',
passed: true,
},
]);
});
});
@ -210,9 +219,10 @@ export default ({ getService }: FtrProviderContext) => {
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
removeTimeFieldsFromTelemetryStats(stats);
const securityLists = stats.security_lists
.flat()
.map((obj: { endpoint_exception: any }) => obj.endpoint_exception);
.map((obj: any) => (obj.passed != null ? obj : obj.endpoint_exception));
expect(securityLists).to.eql([
{
created_at: securityLists[0].created_at,
@ -228,6 +238,10 @@ export default ({ getService }: FtrProviderContext) => {
name: ENDPOINT_LIST_ID,
os_types: [],
},
{
name: 'Security Solution Lists Telemetry',
passed: true,
},
]);
});
});
@ -271,11 +285,12 @@ export default ({ getService }: FtrProviderContext) => {
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
removeTimeFieldsFromTelemetryStats(stats);
const securityLists = stats.security_lists
.flat()
.map((obj: { endpoint_exception: any }) => obj.endpoint_exception)
.map((obj: any) => (obj.passed != null ? obj : obj.endpoint_exception))
.sort((obj1: { entries: { name: number } }, obj2: { entries: { name: number } }) => {
return obj1.entries.name - obj2.entries.name;
return obj1?.entries?.name - obj2?.entries?.name;
});
expect(securityLists).to.eql([
@ -307,6 +322,10 @@ export default ({ getService }: FtrProviderContext) => {
name: ENDPOINT_LIST_ID,
os_types: [],
},
{
name: 'Security Solution Lists Telemetry',
passed: true,
},
]);
});
});
@ -346,9 +365,11 @@ export default ({ getService }: FtrProviderContext) => {
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
removeTimeFieldsFromTelemetryStats(stats);
const endPointEventFilter = stats.security_lists
.flat()
.map((obj: { endpoint_event_filter: any }) => obj.endpoint_event_filter);
.map((obj: any) => (obj.passed != null ? obj : obj.endpoint_event_filter));
expect(endPointEventFilter).to.eql([
{
created_at: endPointEventFilter[0].created_at,
@ -364,6 +385,10 @@ export default ({ getService }: FtrProviderContext) => {
name: ENDPOINT_EVENT_FILTERS_LIST_ID,
os_types: ['linux'],
},
{
name: 'Security Solution Lists Telemetry',
passed: true,
},
]);
});
});
@ -407,11 +432,12 @@ export default ({ getService }: FtrProviderContext) => {
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
removeTimeFieldsFromTelemetryStats(stats);
const endPointEventFilter = stats.security_lists
.flat()
.map((obj: { endpoint_event_filter: any }) => obj.endpoint_event_filter)
.map((obj: any) => (obj.passed != null ? obj : obj.endpoint_event_filter))
.sort((obj1: { entries: { name: number } }, obj2: { entries: { name: number } }) => {
return obj1.entries.name - obj2.entries.name;
return obj1?.entries?.name - obj2?.entries?.name;
});
expect(endPointEventFilter).to.eql([
@ -443,6 +469,10 @@ export default ({ getService }: FtrProviderContext) => {
name: ENDPOINT_EVENT_FILTERS_LIST_ID,
os_types: ['macos'],
},
{
name: 'Security Solution Lists Telemetry',
passed: true,
},
]);
});
});

View file

@ -80,6 +80,7 @@ export * from './get_web_hook_action';
export * from './index_event_log_execution_events';
export * from './install_prepackaged_rules';
export * from './refresh_index';
export * from './remove_time_fields_from_telemetry_stats';
export * from './remove_server_generated_properties';
export * from './remove_server_generated_properties_including_rule_id';
export * from './resolve_simple_rule_output';

View file

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { unset } from 'lodash';
export const removeTimeFieldsFromTelemetryStats = (stats: any) => {
Object.entries(stats).forEach(([, value]: [unknown, any]) => {
value.forEach((entry: any, i: number) => {
entry.forEach((e: any, j: number) => {
unset(value, `[${i}][${j}].time_executed_in_ms`);
unset(value, `[${i}][${j}].start_time`);
unset(value, `[${i}][${j}].end_time`);
});
});
});
};