mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
[Security Solution][Telemetry] Add integration tests (#181927)
This commit is contained in:
parent
57a27278e9
commit
2cc5109484
8 changed files with 775 additions and 309 deletions
|
@ -4,7 +4,7 @@
|
|||
"build": {
|
||||
"original": "version: 8.6.0, compiled: Mon Jan 2 23:00:00 2023, branch: 8.6, commit: e2d09ff1b8e49bfb5f8940d317eb4ac96672d956"
|
||||
},
|
||||
"id": "123",
|
||||
"id": "456",
|
||||
"type": "endpoint",
|
||||
"version": "8.6.0"
|
||||
},
|
||||
|
|
|
@ -22,8 +22,8 @@ const asyncUnlink = Util.promisify(Fs.unlink);
|
|||
*/
|
||||
export async function eventually<T>(
|
||||
cb: () => Promise<T>,
|
||||
duration: number = 30000,
|
||||
interval: number = 200
|
||||
duration: number = 60000,
|
||||
interval: number = 1000
|
||||
) {
|
||||
let elapsed = 0;
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ import {
|
|||
initEndpointIndices,
|
||||
dropEndpointIndices,
|
||||
mockEndpointData,
|
||||
getTelemetryReceiver,
|
||||
} from './lib/telemetry_helpers';
|
||||
|
||||
import {
|
||||
|
@ -47,6 +48,9 @@ import type { SecurityTelemetryTask } from '../lib/telemetry/task';
|
|||
import { TelemetryChannel } from '../lib/telemetry/types';
|
||||
import type { AsyncTelemetryEventsSender } from '../lib/telemetry/async_sender';
|
||||
import endpointMetaTelemetryRequest from './__mocks__/endpoint-meta-telemetry-request.json';
|
||||
import type { ITelemetryReceiver, TelemetryReceiver } from '../lib/telemetry/receiver';
|
||||
import type { TaskMetric } from '../lib/telemetry/task_metrics.types';
|
||||
import type { AgentPolicy } from '@kbn/fleet-plugin/common';
|
||||
|
||||
jest.mock('axios');
|
||||
|
||||
|
@ -57,6 +61,10 @@ const securitySolutionStartSpy = jest.spyOn(SecuritySolutionPlugin.prototype, 's
|
|||
const mockedAxiosGet = jest.spyOn(axios, 'get');
|
||||
const mockedAxiosPost = jest.spyOn(axios, 'post');
|
||||
|
||||
const securitySolutionPlugin = jest.spyOn(SecuritySolutionPlugin.prototype, 'start');
|
||||
|
||||
type Defer = () => void;
|
||||
|
||||
describe('telemetry tasks', () => {
|
||||
let esServer: TestElasticsearchUtils;
|
||||
let kibanaServer: TestKibanaUtils;
|
||||
|
@ -66,6 +74,8 @@ describe('telemetry tasks', () => {
|
|||
let exceptionsList: ExceptionListSchema[] = [];
|
||||
let exceptionsListItem: ExceptionListItemSchema[] = [];
|
||||
let esClient: ElasticsearchClient;
|
||||
let telemetryReceiver: ITelemetryReceiver;
|
||||
let deferred: Defer[] = [];
|
||||
|
||||
beforeAll(async () => {
|
||||
await removeFile(logFilePath);
|
||||
|
@ -90,6 +100,9 @@ describe('telemetry tasks', () => {
|
|||
});
|
||||
|
||||
esClient = kibanaServer.coreStart.elasticsearch.client.asInternalUser;
|
||||
|
||||
expect(securitySolutionPlugin).toHaveBeenCalledTimes(1);
|
||||
telemetryReceiver = getTelemetryReceiver(securitySolutionPlugin);
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
|
@ -104,6 +117,7 @@ describe('telemetry tasks', () => {
|
|||
beforeEach(async () => {
|
||||
jest.clearAllMocks();
|
||||
mockAxiosGet();
|
||||
deferred = [];
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
|
@ -120,6 +134,13 @@ describe('telemetry tasks', () => {
|
|||
exceptionsListItem = [];
|
||||
});
|
||||
await cleanupMockedEndpointAlerts(kibanaServer.coreStart.elasticsearch.client.asInternalUser);
|
||||
deferred.forEach((d) => {
|
||||
try {
|
||||
d();
|
||||
} catch (e) {
|
||||
// ignore errors
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe('detection-rules', () => {
|
||||
|
@ -143,14 +164,13 @@ describe('telemetry tasks', () => {
|
|||
|
||||
it('should send task metrics', async () => {
|
||||
const task = await mockAndScheduleDetectionRulesTask();
|
||||
const started = performance.now();
|
||||
|
||||
const requests = await getTaskMetricsRequests(task);
|
||||
const requests = await getTaskMetricsRequests(task, started);
|
||||
|
||||
expect(requests.length).toBeGreaterThan(0);
|
||||
requests.forEach(({ body }) => {
|
||||
const asJson = JSON.parse(body);
|
||||
expect(asJson).not.toBeFalsy();
|
||||
expect(asJson.passed).toEqual(true);
|
||||
requests.forEach((t) => {
|
||||
expect(t.taskMetric.passed).toEqual(true);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -159,13 +179,14 @@ describe('telemetry tasks', () => {
|
|||
it('should use legacy sender by default', async () => {
|
||||
// launch a random task and verify it uses the new configuration
|
||||
const task = await mockAndScheduleDetectionRulesTask();
|
||||
const started = performance.now();
|
||||
|
||||
const requests = await getTaskMetricsRequests(task);
|
||||
const requests = await getTaskMetricsRequests(task, started);
|
||||
expect(requests.length).toBeGreaterThan(0);
|
||||
requests.forEach(({ config }) => {
|
||||
expect(config).not.toBeFalsy();
|
||||
if (config && config.headers) {
|
||||
expect(config.headers['X-Telemetry-Sender']).not.toEqual('async');
|
||||
requests.forEach((r) => {
|
||||
expect(r.requestConfig).not.toBeFalsy();
|
||||
if (r.requestConfig && r.requestConfig.headers) {
|
||||
expect(r.requestConfig.headers['X-Telemetry-Sender']).not.toEqual('async');
|
||||
}
|
||||
});
|
||||
});
|
||||
|
@ -188,13 +209,14 @@ describe('telemetry tasks', () => {
|
|||
});
|
||||
|
||||
const task = await mockAndScheduleDetectionRulesTask();
|
||||
const started = performance.now();
|
||||
|
||||
const requests = await getTaskMetricsRequests(task);
|
||||
const requests = await getTaskMetricsRequests(task, started);
|
||||
expect(requests.length).toBeGreaterThan(0);
|
||||
requests.forEach(({ config }) => {
|
||||
expect(config).not.toBeFalsy();
|
||||
if (config && config.headers) {
|
||||
expect(config.headers['X-Telemetry-Sender']).toEqual('async');
|
||||
requests.forEach((r) => {
|
||||
expect(r.requestConfig).not.toBeFalsy();
|
||||
if (r.requestConfig && r.requestConfig.headers) {
|
||||
expect(r.requestConfig.headers['X-Telemetry-Sender']).toEqual('async');
|
||||
}
|
||||
});
|
||||
});
|
||||
|
@ -260,23 +282,405 @@ describe('telemetry tasks', () => {
|
|||
it('should execute when scheduled', async () => {
|
||||
await mockAndScheduleEndpointTask();
|
||||
|
||||
const body = await eventually(async () => {
|
||||
const found = mockedAxiosPost.mock.calls.find(([url]) => {
|
||||
return url.startsWith(ENDPOINT_STAGING) && url.endsWith(TELEMETRY_CHANNEL_ENDPOINT_META);
|
||||
});
|
||||
const endpointMetaRequests = await getEndpointMetaRequests();
|
||||
|
||||
expect(found).not.toBeFalsy();
|
||||
expect(endpointMetaRequests.length).toBe(2);
|
||||
|
||||
return JSON.parse((found ? found[1] : '{}') as string);
|
||||
});
|
||||
const body = endpointMetaRequests[0];
|
||||
|
||||
expect(body.endpoint_metrics).toStrictEqual(endpointMetaTelemetryRequest.endpoint_metrics);
|
||||
expect(body.endpoint_meta).toStrictEqual(endpointMetaTelemetryRequest.endpoint_meta);
|
||||
expect(body.policy_config).toStrictEqual(endpointMetaTelemetryRequest.policy_config);
|
||||
expect(body.policy_response).toStrictEqual(endpointMetaTelemetryRequest.policy_response);
|
||||
});
|
||||
|
||||
it('should manage runtime errors searching endpoint metrics', async () => {
|
||||
const fetchEndpointMetricsAbstract = telemetryReceiver.fetchEndpointMetricsAbstract;
|
||||
deferred.push(() => {
|
||||
telemetryReceiver.fetchEndpointMetricsAbstract = fetchEndpointMetricsAbstract;
|
||||
});
|
||||
|
||||
const errorMessage = 'Something went wront';
|
||||
|
||||
telemetryReceiver.fetchEndpointMetricsAbstract = jest.fn((_) =>
|
||||
Promise.reject(Error(errorMessage))
|
||||
);
|
||||
|
||||
const task = await mockAndScheduleEndpointTask();
|
||||
const started = performance.now();
|
||||
|
||||
const requests = await getTaskMetricsRequests(task, started);
|
||||
|
||||
expect(requests.length).toBe(1);
|
||||
|
||||
const metric = requests[0];
|
||||
|
||||
expect(metric).not.toBeFalsy();
|
||||
expect(metric.taskMetric.passed).toBe(false);
|
||||
expect(metric.taskMetric.error_message).toBe(errorMessage);
|
||||
});
|
||||
|
||||
it('should manage runtime errors searching fleet agents', async () => {
|
||||
const receiver: TelemetryReceiver = telemetryReceiver as TelemetryReceiver;
|
||||
const agentClient = receiver['agentClient']!;
|
||||
const listAgents = agentClient.listAgents;
|
||||
deferred.push(() => {
|
||||
agentClient.listAgents = listAgents;
|
||||
});
|
||||
|
||||
const errorMessage = 'Error searching for fleet agents';
|
||||
|
||||
agentClient.listAgents = jest.fn((_) => Promise.reject(Error(errorMessage)));
|
||||
|
||||
const task = await mockAndScheduleEndpointTask();
|
||||
const started = performance.now();
|
||||
|
||||
const endpointMetaRequests = await getEndpointMetaRequests();
|
||||
|
||||
expect(endpointMetaRequests.length).toBe(2);
|
||||
const body = endpointMetaRequests[0];
|
||||
|
||||
expect(body.endpoint_metrics).toStrictEqual(endpointMetaTelemetryRequest.endpoint_metrics);
|
||||
expect(body.endpoint_meta).toStrictEqual(endpointMetaTelemetryRequest.endpoint_meta);
|
||||
expect(body.policy_config).toStrictEqual({});
|
||||
expect(body.policy_response).toStrictEqual({});
|
||||
|
||||
const requests = await getTaskMetricsRequests(task, started);
|
||||
|
||||
expect(requests.length).toBe(1);
|
||||
|
||||
const metric = requests[0];
|
||||
|
||||
expect(metric).not.toBeFalsy();
|
||||
expect(metric.taskMetric.passed).toBe(true);
|
||||
});
|
||||
|
||||
it('should work without fleet agents', async () => {
|
||||
const receiver: TelemetryReceiver = telemetryReceiver as TelemetryReceiver;
|
||||
const agentClient = receiver['agentClient']!;
|
||||
const listAgents = agentClient.listAgents;
|
||||
deferred.push(() => {
|
||||
agentClient.listAgents = listAgents;
|
||||
});
|
||||
|
||||
agentClient.listAgents = jest.fn((_) =>
|
||||
Promise.resolve({
|
||||
agents: [],
|
||||
total: 0,
|
||||
page: 0,
|
||||
perPage: 0,
|
||||
})
|
||||
);
|
||||
|
||||
const task = await mockAndScheduleEndpointTask();
|
||||
const started = performance.now();
|
||||
|
||||
const endpointMetaRequests = await getEndpointMetaRequests();
|
||||
|
||||
expect(endpointMetaRequests.length).toBe(2);
|
||||
const body = endpointMetaRequests[0];
|
||||
|
||||
expect(body.endpoint_metrics).toStrictEqual(endpointMetaTelemetryRequest.endpoint_metrics);
|
||||
expect(body.endpoint_meta).toStrictEqual(endpointMetaTelemetryRequest.endpoint_meta);
|
||||
expect(body.policy_config).toStrictEqual({});
|
||||
expect(body.policy_response).toStrictEqual({});
|
||||
|
||||
const requests = await getTaskMetricsRequests(task, started);
|
||||
|
||||
expect(requests.length).toBe(1);
|
||||
|
||||
const metric = requests[0];
|
||||
|
||||
expect(metric).not.toBeFalsy();
|
||||
expect(metric.taskMetric.passed).toBe(true);
|
||||
// expect(metric.error_message).toBe(errorMessage);
|
||||
});
|
||||
|
||||
it('should manage runtime errors policy configs', async () => {
|
||||
const errorMessage = 'Error getting policy configs';
|
||||
const fetchPolicyConfigs = telemetryReceiver.fetchPolicyConfigs;
|
||||
deferred.push(() => {
|
||||
telemetryReceiver.fetchPolicyConfigs = fetchPolicyConfigs;
|
||||
});
|
||||
|
||||
telemetryReceiver.fetchPolicyConfigs = jest.fn((_) => Promise.reject(Error(errorMessage)));
|
||||
|
||||
const task = await mockAndScheduleEndpointTask();
|
||||
const started = performance.now();
|
||||
|
||||
const endpointMetaRequests = await getEndpointMetaRequests();
|
||||
|
||||
expect(endpointMetaRequests.length).toBe(2);
|
||||
const body = endpointMetaRequests[0];
|
||||
|
||||
expect(body.endpoint_metrics).toStrictEqual(endpointMetaTelemetryRequest.endpoint_metrics);
|
||||
expect(body.endpoint_meta).toStrictEqual(endpointMetaTelemetryRequest.endpoint_meta);
|
||||
expect(body.policy_config).toStrictEqual({});
|
||||
expect(body.policy_response).toStrictEqual({});
|
||||
|
||||
const requests = await getTaskMetricsRequests(task, started);
|
||||
|
||||
expect(requests.length).toBe(1);
|
||||
|
||||
const metric = requests[0];
|
||||
|
||||
expect(metric).not.toBeFalsy();
|
||||
expect(metric.taskMetric.passed).toBe(true);
|
||||
});
|
||||
|
||||
it('should manage unexpected errors dealing with policy configs', async () => {
|
||||
const fetchPolicyConfigs = telemetryReceiver.fetchPolicyConfigs;
|
||||
deferred.push(() => {
|
||||
telemetryReceiver.fetchPolicyConfigs = fetchPolicyConfigs;
|
||||
});
|
||||
|
||||
telemetryReceiver.fetchPolicyConfigs = jest.fn((_) => {
|
||||
return Promise.resolve({
|
||||
package_policies: [
|
||||
{
|
||||
invalid: 'value',
|
||||
inputs: [
|
||||
{
|
||||
unexpected: 'boom!',
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
} as unknown as AgentPolicy);
|
||||
});
|
||||
|
||||
const task = await mockAndScheduleEndpointTask();
|
||||
const started = performance.now();
|
||||
|
||||
const endpointMetaRequests = await getEndpointMetaRequests();
|
||||
|
||||
expect(endpointMetaRequests.length).toBe(2);
|
||||
const body = endpointMetaRequests[0];
|
||||
|
||||
expect(body.endpoint_metrics).toStrictEqual(endpointMetaTelemetryRequest.endpoint_metrics);
|
||||
expect(body.endpoint_meta).toStrictEqual(endpointMetaTelemetryRequest.endpoint_meta);
|
||||
expect(body.policy_config).toStrictEqual({});
|
||||
expect(body.policy_response).toStrictEqual({});
|
||||
|
||||
const requests = await getTaskMetricsRequests(task, started);
|
||||
|
||||
expect(requests.length).toBe(1);
|
||||
|
||||
const metric = requests[0];
|
||||
|
||||
expect(metric).not.toBeFalsy();
|
||||
expect(metric.taskMetric.passed).toBe(true);
|
||||
});
|
||||
|
||||
it('should manage runtime errors fetching policy responses', async () => {
|
||||
const errorMessage = 'Error getting policy responses';
|
||||
const fetchEndpointPolicyResponses = telemetryReceiver.fetchEndpointPolicyResponses;
|
||||
deferred.push(() => {
|
||||
telemetryReceiver.fetchEndpointPolicyResponses = fetchEndpointPolicyResponses;
|
||||
});
|
||||
|
||||
telemetryReceiver.fetchEndpointPolicyResponses = jest.fn((_from, _to) => {
|
||||
return Promise.reject(Error(errorMessage));
|
||||
});
|
||||
|
||||
const task = await mockAndScheduleEndpointTask();
|
||||
const started = performance.now();
|
||||
|
||||
const endpointMetaRequests = await getEndpointMetaRequests();
|
||||
|
||||
expect(endpointMetaRequests.length).toBe(2);
|
||||
const body = endpointMetaRequests[0];
|
||||
|
||||
expect(body.endpoint_metrics).toStrictEqual(endpointMetaTelemetryRequest.endpoint_metrics);
|
||||
expect(body.endpoint_meta).toStrictEqual(endpointMetaTelemetryRequest.endpoint_meta);
|
||||
expect(body.policy_config).toStrictEqual(endpointMetaTelemetryRequest.policy_config);
|
||||
expect(body.policy_response).toStrictEqual({});
|
||||
|
||||
const requests = await getTaskMetricsRequests(task, started);
|
||||
|
||||
expect(requests.length).toBe(1);
|
||||
|
||||
const metric = requests[0];
|
||||
|
||||
expect(metric).not.toBeFalsy();
|
||||
expect(metric.taskMetric.passed).toBe(true);
|
||||
});
|
||||
|
||||
it('should manage work with no policy responses', async () => {
|
||||
const fetchEndpointPolicyResponses = telemetryReceiver.fetchEndpointPolicyResponses;
|
||||
deferred.push(() => {
|
||||
telemetryReceiver.fetchEndpointPolicyResponses = fetchEndpointPolicyResponses;
|
||||
});
|
||||
|
||||
telemetryReceiver.fetchEndpointPolicyResponses = jest.fn((_from, _to) => {
|
||||
return Promise.resolve(new Map());
|
||||
});
|
||||
|
||||
const task = await mockAndScheduleEndpointTask();
|
||||
const started = performance.now();
|
||||
|
||||
const endpointMetaRequests = await getEndpointMetaRequests();
|
||||
|
||||
expect(endpointMetaRequests.length).toBe(2);
|
||||
const body = endpointMetaRequests[0];
|
||||
|
||||
expect(body.endpoint_metrics).toStrictEqual(endpointMetaTelemetryRequest.endpoint_metrics);
|
||||
expect(body.endpoint_meta).toStrictEqual(endpointMetaTelemetryRequest.endpoint_meta);
|
||||
expect(body.policy_config).toStrictEqual(endpointMetaTelemetryRequest.policy_config);
|
||||
expect(body.policy_response).toStrictEqual({});
|
||||
|
||||
const requests = await getTaskMetricsRequests(task, started);
|
||||
|
||||
expect(requests.length).toBe(1);
|
||||
|
||||
const metric = requests[0];
|
||||
|
||||
expect(metric).not.toBeFalsy();
|
||||
expect(metric.taskMetric.passed).toBe(true);
|
||||
});
|
||||
|
||||
it('should manage runtime errors fetching endpoint metadata', async () => {
|
||||
const errorMessage = 'Error getting policy responses';
|
||||
const fetchEndpointMetadata = telemetryReceiver.fetchEndpointMetadata;
|
||||
deferred.push(() => {
|
||||
telemetryReceiver.fetchEndpointMetadata = fetchEndpointMetadata;
|
||||
});
|
||||
|
||||
telemetryReceiver.fetchEndpointMetadata = jest.fn((_from, _to) => {
|
||||
return Promise.reject(Error(errorMessage));
|
||||
});
|
||||
|
||||
const task = await mockAndScheduleEndpointTask();
|
||||
const started = performance.now();
|
||||
|
||||
const endpointMetaRequests = await getEndpointMetaRequests();
|
||||
|
||||
expect(endpointMetaRequests.length).toBe(2);
|
||||
const body = endpointMetaRequests[0];
|
||||
|
||||
expect(body.endpoint_metrics).toStrictEqual(endpointMetaTelemetryRequest.endpoint_metrics);
|
||||
expect(body.endpoint_meta).toStrictEqual({
|
||||
...endpointMetaTelemetryRequest.endpoint_meta,
|
||||
capabilities: [],
|
||||
});
|
||||
expect(body.policy_config).toStrictEqual(endpointMetaTelemetryRequest.policy_config);
|
||||
expect(body.policy_response).toStrictEqual(endpointMetaTelemetryRequest.policy_response);
|
||||
|
||||
const requests = await getTaskMetricsRequests(task, started);
|
||||
|
||||
expect(requests.length).toBe(1);
|
||||
|
||||
const metric = requests[0];
|
||||
|
||||
expect(metric).not.toBeFalsy();
|
||||
expect(metric.taskMetric.passed).toBe(true);
|
||||
});
|
||||
|
||||
it('should work with no endpoint metadata', async () => {
|
||||
const fetchEndpointMetadata = telemetryReceiver.fetchEndpointMetadata;
|
||||
deferred.push(() => {
|
||||
telemetryReceiver.fetchEndpointMetadata = fetchEndpointMetadata;
|
||||
});
|
||||
|
||||
telemetryReceiver.fetchEndpointMetadata = jest.fn((_from, _to) => {
|
||||
return Promise.resolve(new Map());
|
||||
});
|
||||
|
||||
const task = await mockAndScheduleEndpointTask();
|
||||
const started = performance.now();
|
||||
|
||||
const endpointMetaRequests = await getEndpointMetaRequests();
|
||||
|
||||
expect(endpointMetaRequests.length).toBe(2);
|
||||
const body = endpointMetaRequests[0];
|
||||
|
||||
expect(body.endpoint_metrics).toStrictEqual(endpointMetaTelemetryRequest.endpoint_metrics);
|
||||
expect(body.endpoint_meta).toStrictEqual({
|
||||
...endpointMetaTelemetryRequest.endpoint_meta,
|
||||
capabilities: [],
|
||||
});
|
||||
expect(body.policy_config).toStrictEqual(endpointMetaTelemetryRequest.policy_config);
|
||||
expect(body.policy_response).toStrictEqual(endpointMetaTelemetryRequest.policy_response);
|
||||
|
||||
const requests = await getTaskMetricsRequests(task, started);
|
||||
|
||||
expect(requests.length).toBe(1);
|
||||
|
||||
const metric = requests[0];
|
||||
|
||||
expect(metric).not.toBeFalsy();
|
||||
expect(metric.taskMetric.passed).toBe(true);
|
||||
});
|
||||
|
||||
it('should manage runtime errors fetching paginating endpoint metrics documents', async () => {
|
||||
const receiver: TelemetryReceiver = telemetryReceiver as TelemetryReceiver;
|
||||
const docsPerPage = receiver['docsPerPage']!;
|
||||
const nextPage = receiver['nextPage']!;
|
||||
deferred.push(() => {
|
||||
receiver['docsPerPage'] = docsPerPage;
|
||||
receiver['nextPage'] = nextPage;
|
||||
});
|
||||
|
||||
// force to pull one doc at a time
|
||||
receiver['docsPerPage'] = jest.fn((_index, _query) => {
|
||||
return Promise.resolve(1);
|
||||
});
|
||||
let pagesServed = 0;
|
||||
receiver['nextPage'] = jest.fn(async (query) => {
|
||||
// fail requesting the second doc
|
||||
if (pagesServed++ >= 1) {
|
||||
return Promise.reject(Error('Boom!'));
|
||||
}
|
||||
return esClient.search(query);
|
||||
});
|
||||
|
||||
const task = await mockAndScheduleEndpointTask();
|
||||
const started = performance.now();
|
||||
|
||||
const endpointMetaRequests = await getEndpointMetaRequests();
|
||||
|
||||
// only one doc processed
|
||||
expect(endpointMetaRequests.length).toBe(1);
|
||||
const body = endpointMetaRequests[0];
|
||||
|
||||
expect(body.endpoint_metrics).toStrictEqual(endpointMetaTelemetryRequest.endpoint_metrics);
|
||||
expect(body.endpoint_meta).toStrictEqual(endpointMetaTelemetryRequest.endpoint_meta);
|
||||
expect(body.policy_config).toStrictEqual(endpointMetaTelemetryRequest.policy_config);
|
||||
expect(body.policy_response).toStrictEqual(endpointMetaTelemetryRequest.policy_response);
|
||||
|
||||
const requests = await getTaskMetricsRequests(task, started);
|
||||
|
||||
expect(requests.length).toBe(1);
|
||||
|
||||
const metric = requests[0];
|
||||
|
||||
expect(metric).not.toBeFalsy();
|
||||
expect(metric.taskMetric.passed).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
async function getEndpointMetaRequests(atLeast: number = 1): Promise<any[]> {
|
||||
return eventually(async () => {
|
||||
const found = mockedAxiosPost.mock.calls.filter(([url]) => {
|
||||
return url.startsWith(ENDPOINT_STAGING) && url.endsWith(TELEMETRY_CHANNEL_ENDPOINT_META);
|
||||
});
|
||||
|
||||
expect(found).not.toBeFalsy();
|
||||
expect(found.length).toBeGreaterThanOrEqual(atLeast);
|
||||
|
||||
return (found ?? []).flatMap((req) => {
|
||||
const ndjson = req[1] as string;
|
||||
return ndjson
|
||||
.split('\n')
|
||||
.filter((l) => l.trim().length > 0)
|
||||
.map((l) => {
|
||||
return JSON.parse(l);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async function mockAndScheduleDetectionRulesTask(): Promise<SecurityTelemetryTask> {
|
||||
const task = getTelemetryTask(tasks, 'security:telemetry-detection-rules');
|
||||
|
||||
|
@ -327,6 +731,12 @@ describe('telemetry tasks', () => {
|
|||
}
|
||||
|
||||
function mockAxiosGet(bufferConfig: unknown = fakeBufferAndSizesConfigAsyncDisabled) {
|
||||
mockedAxiosPost.mockImplementation(
|
||||
async (_url: string, _data?: unknown, _config?: AxiosRequestConfig<unknown> | undefined) => {
|
||||
return { status: 200 };
|
||||
}
|
||||
);
|
||||
|
||||
mockedAxiosGet.mockImplementation(async (url: string) => {
|
||||
if (url.startsWith(ENDPOINT_STAGING) && url.endsWith('ping')) {
|
||||
return { status: 200 };
|
||||
|
@ -345,11 +755,13 @@ describe('telemetry tasks', () => {
|
|||
});
|
||||
}
|
||||
|
||||
async function getTaskMetricsRequests(task: SecurityTelemetryTask): Promise<
|
||||
async function getTaskMetricsRequests(
|
||||
task: SecurityTelemetryTask,
|
||||
olderThan: number
|
||||
): Promise<
|
||||
Array<{
|
||||
url: string;
|
||||
body: string;
|
||||
config: AxiosRequestConfig<unknown> | undefined;
|
||||
taskMetric: TaskMetric;
|
||||
requestConfig: AxiosRequestConfig<unknown> | undefined;
|
||||
}>
|
||||
> {
|
||||
return eventually(async () => {
|
||||
|
@ -367,7 +779,14 @@ describe('telemetry tasks', () => {
|
|||
);
|
||||
});
|
||||
expect(requests.length).toBeGreaterThan(0);
|
||||
return requests;
|
||||
return requests
|
||||
.map((r) => {
|
||||
return {
|
||||
taskMetric: JSON.parse(r.body) as TaskMetric,
|
||||
requestConfig: r.config,
|
||||
};
|
||||
})
|
||||
.filter((t) => t.taskMetric.start_time >= olderThan);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
|
|
@ -117,20 +117,21 @@ export const createMockTelemetryReceiver = (
|
|||
getClusterInfo: jest.fn().mockReturnValue(stubClusterInfo),
|
||||
fetchLicenseInfo: jest.fn().mockReturnValue(stubLicenseInfo),
|
||||
copyLicenseFields: jest.fn(),
|
||||
fetchFleetAgents: jest.fn().mockReturnValue(stubFleetAgentResponse),
|
||||
fetchPolicyConfigs: jest.fn().mockReturnValue(Promise.resolve(null)),
|
||||
fetchFleetAgents: jest.fn().mockReturnValue(Promise.resolve(stubFleetAgentResponse)),
|
||||
openPointInTime: jest.fn().mockReturnValue(Promise.resolve('test-pit-id')),
|
||||
getAlertsIndex: jest.fn().mockReturnValue('alerts-*'),
|
||||
fetchDiagnosticAlertsBatch: jest.fn().mockReturnValue(diagnosticsAlert ?? jest.fn()),
|
||||
getExperimentalFeatures: jest.fn().mockReturnValue(undefined),
|
||||
fetchEndpointMetricsAbstract: jest.fn().mockReturnValue(stubEndpointMetricsAbstractResponse),
|
||||
fetchEndpointMetricsById: jest.fn().mockReturnValue(stubEndpointMetricsByIdResponse),
|
||||
fetchEndpointPolicyResponses: jest.fn(),
|
||||
fetchEndpointPolicyResponses: jest.fn().mockReturnValue(Promise.resolve(new Map())),
|
||||
fetchPrebuiltRuleAlertsBatch: jest.fn().mockReturnValue(prebuiltRuleAlertsResponse),
|
||||
fetchDetectionRulesPackageVersion: jest.fn(),
|
||||
fetchTrustedApplications: jest.fn(),
|
||||
fetchEndpointList: jest.fn(),
|
||||
fetchDetectionRules: jest.fn().mockReturnValue({ body: null }),
|
||||
fetchEndpointMetadata: jest.fn(),
|
||||
fetchEndpointMetadata: jest.fn().mockReturnValue(Promise.resolve(new Map())),
|
||||
fetchTimelineAlerts: jest.fn().mockReturnValue(Promise.resolve(stubEndpointAlertResponse())),
|
||||
buildProcessTree: jest.fn().mockReturnValue(processTreeResponse),
|
||||
fetchTimelineEvents: jest.fn().mockReturnValue(Promise.resolve(stubFetchTimelineEvents())),
|
||||
|
|
|
@ -306,8 +306,11 @@ export const tlog = (logger: Logger, message: string, meta?: LogMeta) => {
|
|||
telemetryLogger(logger, message, meta);
|
||||
};
|
||||
|
||||
export const newTelemetryLogger = (logger: Logger): TelemetryLogger => {
|
||||
return new TelemetryLoggerImpl(logger);
|
||||
export const newTelemetryLogger = (
|
||||
logger: Logger,
|
||||
mdc?: LogMeta | object | undefined
|
||||
): TelemetryLogger => {
|
||||
return new TelemetryLoggerImpl(logger, mdc);
|
||||
};
|
||||
|
||||
function obfuscateString(clusterId: string, toHash: string): string {
|
||||
|
|
|
@ -53,8 +53,8 @@ import {
|
|||
exceptionListItemToTelemetryEntry,
|
||||
trustedApplicationToTelemetryEntry,
|
||||
ruleExceptionListItemToTelemetryEvent,
|
||||
tlog,
|
||||
setClusterInfo,
|
||||
newTelemetryLogger,
|
||||
} from './helpers';
|
||||
import { Fetcher } from '../../endpoint/routes/resolver/tree/utils/fetch';
|
||||
import type { TreeOptions, TreeResponse } from '../../endpoint/routes/resolver/tree/utils/fetch';
|
||||
|
@ -85,6 +85,7 @@ import { telemetryConfiguration } from './configuration';
|
|||
import { ENDPOINT_METRICS_INDEX } from '../../../common/constants';
|
||||
import { PREBUILT_RULES_PACKAGE_NAME } from '../../../common/detection_engine/constants';
|
||||
import { DEFAULT_DIAGNOSTIC_INDEX } from './constants';
|
||||
import type { TelemetryLogger } from './telemetry_logger';
|
||||
|
||||
export interface ITelemetryReceiver {
|
||||
start(
|
||||
|
@ -234,7 +235,7 @@ export interface ITelemetryReceiver {
|
|||
}
|
||||
|
||||
export class TelemetryReceiver implements ITelemetryReceiver {
|
||||
private readonly logger: Logger;
|
||||
private readonly logger: TelemetryLogger;
|
||||
private agentClient?: AgentClient;
|
||||
private agentPolicyService?: AgentPolicyServiceInterface;
|
||||
private _esClient?: ElasticsearchClient;
|
||||
|
@ -254,7 +255,7 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
private numDocsToSample: number = 10;
|
||||
|
||||
constructor(logger: Logger) {
|
||||
this.logger = logger.get('telemetry_events.receiver');
|
||||
this.logger = newTelemetryLogger(logger.get('telemetry_events.receiver'));
|
||||
}
|
||||
|
||||
public async start(
|
||||
|
@ -528,7 +529,10 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
}
|
||||
|
||||
public async *fetchDiagnosticAlertsBatch(executeFrom: string, executeTo: string) {
|
||||
tlog(this.logger, `Searching diagnostic alerts from ${executeFrom} to ${executeTo}`);
|
||||
this.logger.l('Searching diagnostic alerts', {
|
||||
from: executeFrom,
|
||||
to: executeTo,
|
||||
});
|
||||
|
||||
let pitId = await this.openPointInTime(DEFAULT_DIAGNOSTIC_INDEX);
|
||||
let fetchMore = true;
|
||||
|
@ -569,10 +573,10 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
fetchMore = false;
|
||||
}
|
||||
|
||||
tlog(this.logger, `Diagnostic alerts to return: ${numOfHits}`);
|
||||
this.logger.l('Diagnostic alerts to return', { numOfHits });
|
||||
fetchMore = numOfHits > 0 && numOfHits < telemetryConfiguration.telemetry_max_buffer_size;
|
||||
} catch (e) {
|
||||
tlog(this.logger, e);
|
||||
this.logger.l('Error fetching alerts', { error: JSON.stringify(e) });
|
||||
fetchMore = false;
|
||||
}
|
||||
|
||||
|
@ -741,7 +745,10 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
}
|
||||
|
||||
public async *fetchPrebuiltRuleAlertsBatch(executeFrom: string, executeTo: string) {
|
||||
tlog(this.logger, `Searching prebuilt rule alerts from ${executeFrom} to ${executeTo}`);
|
||||
this.logger.l('Searching prebuilt rule alerts from', {
|
||||
executeFrom,
|
||||
executeTo,
|
||||
});
|
||||
|
||||
let pitId = await this.openPointInTime(DEFAULT_DIAGNOSTIC_INDEX);
|
||||
let fetchMore = true;
|
||||
|
@ -876,14 +883,14 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
pitId = response?.pit_id;
|
||||
}
|
||||
|
||||
tlog(this.logger, `Prebuilt rule alerts to return: ${alerts.length}`);
|
||||
this.logger.l('Prebuilt rule alerts to return', { alerts: alerts.length });
|
||||
|
||||
yield alerts;
|
||||
}
|
||||
} catch (e) {
|
||||
// to keep backward compatibility with the previous implementation, silent return
|
||||
// once we start using `paginate` this error should be managed downstream
|
||||
tlog(this.logger, e);
|
||||
this.logger.l('Error fetching alerts', { error: JSON.stringify(e) });
|
||||
return;
|
||||
} finally {
|
||||
await this.closePointInTime(pitId);
|
||||
|
@ -907,7 +914,10 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
try {
|
||||
await this.esClient().closePointInTime({ id: pitId });
|
||||
} catch (error) {
|
||||
tlog(this.logger, `Error trying to close point in time: "${pitId}". Error is: "${error}"`);
|
||||
this.logger.l('Error trying to close point in time', {
|
||||
pit: pitId,
|
||||
error: JSON.stringify(error),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -993,7 +1003,7 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
|
||||
fetchMore = numOfHits > 0;
|
||||
} catch (e) {
|
||||
tlog(this.logger, e);
|
||||
this.logger.l('Error fetching alerts', { error: JSON.stringify(e) });
|
||||
fetchMore = false;
|
||||
}
|
||||
|
||||
|
@ -1008,13 +1018,15 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
try {
|
||||
await this.esClient().closePointInTime({ id: pitId });
|
||||
} catch (error) {
|
||||
tlog(
|
||||
this.logger,
|
||||
`Error trying to close point in time: "${pitId}", it will expire within "${keepAlive}". Error is: "${error}"`
|
||||
);
|
||||
this.logger.l('Error trying to close point in time', {
|
||||
pit: pitId,
|
||||
error: JSON.stringify(error),
|
||||
keepAlive,
|
||||
});
|
||||
}
|
||||
|
||||
tlog(this.logger, `Timeline alerts to return: ${alertsToReturn.length}`);
|
||||
this.logger.l('Timeline alerts to return', { alerts: alertsToReturn.length });
|
||||
|
||||
return alertsToReturn || [];
|
||||
}
|
||||
|
||||
|
@ -1202,7 +1214,7 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
|
||||
return ret.license;
|
||||
} catch (err) {
|
||||
tlog(this.logger, `failed retrieving license: ${err}`);
|
||||
this.logger.l('failed retrieving license', { error: JSON.stringify(err) });
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
@ -1240,14 +1252,14 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
const pit = {
|
||||
id: await this.openPointInTime(index),
|
||||
};
|
||||
const esQuery = {
|
||||
const esQuery: ESSearchRequest = {
|
||||
...cloneDeep(query),
|
||||
pit,
|
||||
size: Math.min(size, 10_000),
|
||||
};
|
||||
try {
|
||||
do {
|
||||
const response = await this.esClient().search(esQuery);
|
||||
const response = await this.nextPage(esQuery);
|
||||
const hits = response?.hits.hits.length ?? 0;
|
||||
|
||||
if (hits === 0) {
|
||||
|
@ -1263,13 +1275,19 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
yield data;
|
||||
} while (esQuery.search_after !== undefined);
|
||||
} catch (e) {
|
||||
tlog(this.logger, `Error running paginated query: ${e}`);
|
||||
this.logger.l('Error running paginated query', { error: JSON.stringify(e) });
|
||||
throw e;
|
||||
} finally {
|
||||
await this.closePointInTime(pit.id);
|
||||
}
|
||||
}
|
||||
|
||||
private async nextPage(
|
||||
esQuery: ESSearchRequest
|
||||
): Promise<SearchResponse<unknown, Record<string, AggregationsAggregate>>> {
|
||||
return this.esClient().search(esQuery);
|
||||
}
|
||||
|
||||
public setMaxPageSizeBytes(bytes: number) {
|
||||
this.maxPageSizeBytes = bytes;
|
||||
}
|
||||
|
|
|
@ -13,11 +13,9 @@ import {
|
|||
TelemetryCounter,
|
||||
type EndpointMetadataDocument,
|
||||
type EndpointMetricDocument,
|
||||
type EndpointMetricsAbstract,
|
||||
type EndpointPolicyResponseDocument,
|
||||
type ESClusterInfo,
|
||||
type ESLicense,
|
||||
type FleetAgentResponse,
|
||||
type Nullable,
|
||||
} from '../types';
|
||||
import type { ITelemetryReceiver } from '../receiver';
|
||||
|
@ -35,18 +33,14 @@ import {
|
|||
} from '../helpers';
|
||||
import type { TelemetryLogger } from '../telemetry_logger';
|
||||
import type { PolicyData } from '../../../../common/endpoint/types';
|
||||
import { telemetryConfiguration } from '../configuration';
|
||||
import { TELEMETRY_CHANNEL_ENDPOINT_META } from '../constants';
|
||||
|
||||
// Endpoint agent uses this Policy ID while it's installing.
|
||||
/**
|
||||
* Endpoint agent uses this Policy ID while it's installing.
|
||||
*/
|
||||
const DefaultEndpointPolicyIdToIgnore = '00000000-0000-0000-0000-000000000000';
|
||||
|
||||
const EmptyFleetAgentResponse: FleetAgentResponse = {
|
||||
agents: [],
|
||||
total: 0,
|
||||
page: 0,
|
||||
perPage: 0,
|
||||
};
|
||||
|
||||
const usageLabelPrefix: string[] = ['security_telemetry', 'endpoint_task'];
|
||||
|
||||
export function createTelemetryEndpointTaskConfig(maxTelemetryBatch: number) {
|
||||
|
@ -66,36 +60,16 @@ export function createTelemetryEndpointTaskConfig(maxTelemetryBatch: number) {
|
|||
taskMetricsService: ITaskMetricsService,
|
||||
taskExecutionPeriod: TaskExecutionPeriod
|
||||
) => {
|
||||
const log = newTelemetryLogger(logger.get('endpoint'));
|
||||
const mdc = { task_id: taskId, task_execution_period: taskExecutionPeriod };
|
||||
const log = newTelemetryLogger(logger.get('endpoint'), mdc);
|
||||
const trace = taskMetricsService.start(taskType);
|
||||
|
||||
log.l(
|
||||
`Running task: ${taskId} [last: ${taskExecutionPeriod.last} - current: ${taskExecutionPeriod.current}]`
|
||||
);
|
||||
log.l('Running telemetry task');
|
||||
|
||||
try {
|
||||
if (!taskExecutionPeriod.last) {
|
||||
throw new Error('last execution timestamp is required');
|
||||
}
|
||||
const processor = new EndpointMetadataProcessor(log, receiver);
|
||||
|
||||
const clusterData = await fetchClusterData(receiver);
|
||||
|
||||
const endpointData = await fetchEndpointData(
|
||||
receiver,
|
||||
taskExecutionPeriod.last,
|
||||
taskExecutionPeriod.current
|
||||
);
|
||||
|
||||
/**
|
||||
* STAGE 1 - Fetch Endpoint Agent Metrics
|
||||
* If no metrics exist, then abort execution, otherwise increment
|
||||
* the usage counter and continue.
|
||||
*/
|
||||
if (endpointData.endpointMetrics.totalEndpoints === 0) {
|
||||
log.l('no endpoint metrics to report');
|
||||
taskMetricsService.end(trace);
|
||||
return 0;
|
||||
}
|
||||
const documents = await processor.process(taskExecutionPeriod);
|
||||
|
||||
const telemetryUsageCounter = sender.getTelemetryUsageCluster();
|
||||
telemetryUsageCounter?.incrementCounter({
|
||||
|
@ -103,74 +77,30 @@ export function createTelemetryEndpointTaskConfig(maxTelemetryBatch: number) {
|
|||
usageLabelPrefix.concat(['payloads', TelemetryChannel.ENDPOINT_META])
|
||||
),
|
||||
counterType: TelemetryCounter.NUM_ENDPOINT,
|
||||
incrementBy: endpointData.endpointMetrics.totalEndpoints,
|
||||
incrementBy: documents.length,
|
||||
});
|
||||
|
||||
/**
|
||||
* STAGE 2
|
||||
* - Fetch Fleet Agent Config
|
||||
* - Ignore policy used while installing the endpoint agent.
|
||||
* - Fetch Endpoint Policy Configs
|
||||
*/
|
||||
const policyIdByAgent = endpointData.policyIdByAgent;
|
||||
endpointData.policyIdByAgent.delete(DefaultEndpointPolicyIdToIgnore);
|
||||
const endpointPolicyById = await endpointPolicies(policyIdByAgent.values(), receiver, log);
|
||||
log.l('Sending endpoint telemetry', {
|
||||
num_docs: documents.length,
|
||||
async_sender: telemetryConfiguration.use_async_sender,
|
||||
});
|
||||
|
||||
/**
|
||||
* STAGE 3 - Fetch Endpoint Policy Responses
|
||||
*/
|
||||
const policyResponses = endpointData.epPolicyResponse;
|
||||
if (policyResponses.size === 0) {
|
||||
log.l('no endpoint policy responses to report');
|
||||
}
|
||||
|
||||
/**
|
||||
* STAGE 4 - Fetch Endpoint Agent Metadata
|
||||
*/
|
||||
const endpointMetadata = endpointData.endpointMetadata;
|
||||
if (endpointMetadata.size === 0) {
|
||||
log.l(`no endpoint metadata to report`);
|
||||
}
|
||||
|
||||
/** STAGE 5 - Create the telemetry log records
|
||||
*
|
||||
* Iterates through the endpoint metrics documents at STAGE 1 and joins them together
|
||||
* to form the telemetry log that is sent back to Elastic Security developers to
|
||||
* make improvements to the product.
|
||||
*/
|
||||
const mappingContext = {
|
||||
policyIdByAgent,
|
||||
endpointPolicyById,
|
||||
policyResponses,
|
||||
endpointMetadata,
|
||||
taskExecutionPeriod,
|
||||
clusterData,
|
||||
};
|
||||
const telemetryPayloads = [];
|
||||
for await (const metrics of receiver.fetchEndpointMetricsById(
|
||||
endpointData.endpointMetrics.endpointMetricIds
|
||||
)) {
|
||||
const payloads = metrics.map((endpointMetric) =>
|
||||
mapEndpointMetric(endpointMetric, mappingContext)
|
||||
);
|
||||
telemetryPayloads.push(...payloads);
|
||||
}
|
||||
|
||||
log.l(`sending ${telemetryPayloads.length} endpoint telemetry records`);
|
||||
|
||||
/**
|
||||
* STAGE 6 - Send the documents
|
||||
*
|
||||
* Send the documents in a batches of maxTelemetryBatch
|
||||
*/
|
||||
const batches = batchTelemetryRecords(telemetryPayloads, maxTelemetryBatch);
|
||||
for (const batch of batches) {
|
||||
await sender.sendOnDemand(TELEMETRY_CHANNEL_ENDPOINT_META, batch);
|
||||
// STAGE 6 - Send the documents
|
||||
if (telemetryConfiguration.use_async_sender) {
|
||||
sender.sendAsync(TelemetryChannel.ENDPOINT_META, documents);
|
||||
} else {
|
||||
const batches = batchTelemetryRecords(documents, maxTelemetryBatch);
|
||||
for (const batch of batches) {
|
||||
await sender.sendOnDemand(TELEMETRY_CHANNEL_ENDPOINT_META, batch);
|
||||
}
|
||||
}
|
||||
taskMetricsService.end(trace);
|
||||
return telemetryPayloads.length;
|
||||
|
||||
return documents.length;
|
||||
} catch (err) {
|
||||
log.warn(`could not complete endpoint alert telemetry task due to ${err?.message}`, err);
|
||||
log.l(`Error running endpoint alert telemetry task`, {
|
||||
error: JSON.stringify(err),
|
||||
});
|
||||
taskMetricsService.end(trace, err);
|
||||
return 0;
|
||||
}
|
||||
|
@ -178,169 +108,259 @@ export function createTelemetryEndpointTaskConfig(maxTelemetryBatch: number) {
|
|||
};
|
||||
}
|
||||
|
||||
async function fetchEndpointData(
|
||||
receiver: ITelemetryReceiver,
|
||||
executeFrom: string,
|
||||
executeTo: string
|
||||
): Promise<{
|
||||
policyIdByAgent: Map<string, string>;
|
||||
endpointMetrics: EndpointMetricsAbstract;
|
||||
epPolicyResponse: Map<string, EndpointPolicyResponseDocument>;
|
||||
endpointMetadata: Map<string, EndpointMetadataDocument>;
|
||||
}> {
|
||||
const [policyIdByAgent, epMetricsAbstractResponse, policyResponse, endpointMetadata] =
|
||||
await Promise.allSettled([
|
||||
receiver.fetchFleetAgents(),
|
||||
receiver.fetchEndpointMetricsAbstract(executeFrom, executeTo),
|
||||
receiver.fetchEndpointPolicyResponses(executeFrom, executeTo),
|
||||
receiver.fetchEndpointMetadata(executeFrom, executeTo),
|
||||
]);
|
||||
class EndpointMetadataProcessor {
|
||||
private readonly logger: TelemetryLogger;
|
||||
|
||||
return {
|
||||
policyIdByAgent: safeValue(policyIdByAgent, EmptyFleetAgentResponse),
|
||||
endpointMetrics: safeValue(epMetricsAbstractResponse),
|
||||
epPolicyResponse: safeValue(policyResponse),
|
||||
endpointMetadata: safeValue(endpointMetadata),
|
||||
};
|
||||
}
|
||||
constructor(logger: Logger, private readonly receiver: ITelemetryReceiver) {
|
||||
this.logger = newTelemetryLogger(logger.get('processor'));
|
||||
}
|
||||
|
||||
async function fetchClusterData(
|
||||
receiver: ITelemetryReceiver
|
||||
): Promise<{ clusterInfo: ESClusterInfo; licenseInfo: Nullable<ESLicense> }> {
|
||||
const [clusterInfoPromise, licenseInfoPromise] = await Promise.allSettled([
|
||||
receiver.fetchClusterInfo(),
|
||||
receiver.fetchLicenseInfo(),
|
||||
]);
|
||||
public async process(taskExecutionPeriod: TaskExecutionPeriod): Promise<object[]> {
|
||||
const last = taskExecutionPeriod.last;
|
||||
const current = taskExecutionPeriod.current;
|
||||
|
||||
const clusterInfo = safeValue(clusterInfoPromise);
|
||||
const licenseInfo = safeValue(licenseInfoPromise);
|
||||
if (!last) {
|
||||
throw new Error('last execution timestamp is required');
|
||||
}
|
||||
|
||||
return { clusterInfo, licenseInfo };
|
||||
}
|
||||
// STAGE 1 - Fetch Endpoint Agent Metrics
|
||||
const endpointMetrics = await this.receiver.fetchEndpointMetricsAbstract(last, current);
|
||||
// If no metrics exist, early (and successfull) exit
|
||||
if (endpointMetrics.totalEndpoints === 0) {
|
||||
this.logger.l('no endpoint metrics to report');
|
||||
return [];
|
||||
}
|
||||
|
||||
async function endpointPolicies(
|
||||
policyIds: IterableIterator<string>,
|
||||
receiver: ITelemetryReceiver,
|
||||
log: TelemetryLogger
|
||||
) {
|
||||
const endpointPolicyCache = new Map<string, PolicyData>();
|
||||
for (const policyId of policyIds) {
|
||||
if (policyId !== null && policyId !== undefined && !endpointPolicyCache.has(policyId)) {
|
||||
const agentPolicy = await receiver.fetchPolicyConfigs(policyId).catch((e) => {
|
||||
log.l(`error fetching policy config due to ${e?.message}`);
|
||||
return null;
|
||||
/**
|
||||
* STAGE 2
|
||||
* - Fetch Fleet Agent Config
|
||||
* - Ignore policy used while installing the endpoint agent.
|
||||
* - Fetch Endpoint Policy Configs
|
||||
*/
|
||||
const policyIdByFleetAgentId = await this.receiver
|
||||
.fetchFleetAgents()
|
||||
.then((policies) => {
|
||||
policies.delete(DefaultEndpointPolicyIdToIgnore);
|
||||
return policies;
|
||||
})
|
||||
.catch((e) => {
|
||||
this.logger.l('Error fetching fleet agents, using an empty value', {
|
||||
error: JSON.stringify(e),
|
||||
});
|
||||
return new Map();
|
||||
});
|
||||
const endpointPolicyById = await this.endpointPolicies(policyIdByFleetAgentId.values());
|
||||
|
||||
/**
|
||||
* STAGE 3 - Fetch Endpoint Policy Responses
|
||||
*/
|
||||
const policyResponses = await this.receiver
|
||||
.fetchEndpointPolicyResponses(last, current)
|
||||
.then((response) => {
|
||||
if (response.size === 0) {
|
||||
this.logger.l('no endpoint policy responses to report');
|
||||
}
|
||||
return response;
|
||||
})
|
||||
.catch((e) => {
|
||||
this.logger.l('Error fetching policy responses, using an empty value', {
|
||||
error: JSON.stringify(e),
|
||||
});
|
||||
return new Map();
|
||||
});
|
||||
|
||||
const packagePolicies = agentPolicy?.package_policies;
|
||||
/**
|
||||
* STAGE 4 - Fetch Endpoint Agent Metadata
|
||||
*/
|
||||
const endpointMetadata = await this.receiver
|
||||
.fetchEndpointMetadata(last, current)
|
||||
.then((response) => {
|
||||
if (response.size === 0) {
|
||||
this.logger.l('no endpoint metadata to report');
|
||||
}
|
||||
return response;
|
||||
})
|
||||
.catch((e) => {
|
||||
this.logger.l('Error fetching endpoint metadata, using an empty value', {
|
||||
error: JSON.stringify(e),
|
||||
});
|
||||
return new Map();
|
||||
});
|
||||
|
||||
if (packagePolicies !== undefined && isPackagePolicyList(packagePolicies)) {
|
||||
packagePolicies
|
||||
.map((pPolicy) => pPolicy as PolicyData)
|
||||
.forEach((pPolicy) => {
|
||||
if (pPolicy.inputs[0]?.config !== undefined && pPolicy.inputs[0]?.config !== null) {
|
||||
pPolicy.inputs.forEach((input) => {
|
||||
if (
|
||||
input.type === FLEET_ENDPOINT_PACKAGE &&
|
||||
input?.config !== undefined &&
|
||||
policyId !== undefined
|
||||
) {
|
||||
endpointPolicyCache.set(policyId, pPolicy);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
/** STAGE 5 - Create the telemetry log records
|
||||
*
|
||||
* Iterates through the endpoint metrics documents at STAGE 1 and joins them together
|
||||
* to form the telemetry log that is sent back to Elastic Security developers to
|
||||
* make improvements to the product.
|
||||
*/
|
||||
const clusterData = await this.fetchClusterData();
|
||||
const mappingContext = {
|
||||
policyIdByFleetAgentId,
|
||||
endpointPolicyById,
|
||||
policyResponses,
|
||||
endpointMetadata,
|
||||
taskExecutionPeriod,
|
||||
clusterData,
|
||||
};
|
||||
const telemetryPayloads: object[] = [];
|
||||
try {
|
||||
for await (const metrics of this.receiver.fetchEndpointMetricsById(
|
||||
endpointMetrics.endpointMetricIds
|
||||
)) {
|
||||
const payloads = metrics.map((endpointMetric) =>
|
||||
this.mapEndpointMetric(endpointMetric, mappingContext)
|
||||
);
|
||||
telemetryPayloads.push(...payloads);
|
||||
}
|
||||
} catch (e) {
|
||||
// something happened in the middle of the pagination, log the error
|
||||
// and return what we collect so far instead of aborting the
|
||||
// whole execution
|
||||
this.logger.l('Error fetching endpoint metrics by id', {
|
||||
error: JSON.stringify(e),
|
||||
});
|
||||
}
|
||||
|
||||
return telemetryPayloads;
|
||||
}
|
||||
|
||||
private async fetchClusterData(): Promise<{
|
||||
clusterInfo: ESClusterInfo;
|
||||
licenseInfo: Nullable<ESLicense>;
|
||||
}> {
|
||||
const [clusterInfoPromise, licenseInfoPromise] = await Promise.allSettled([
|
||||
this.receiver.fetchClusterInfo(),
|
||||
this.receiver.fetchLicenseInfo(),
|
||||
]);
|
||||
|
||||
const clusterInfo = safeValue(clusterInfoPromise);
|
||||
const licenseInfo = safeValue(licenseInfoPromise);
|
||||
|
||||
return { clusterInfo, licenseInfo };
|
||||
}
|
||||
|
||||
private async endpointPolicies(policies: IterableIterator<string>) {
|
||||
const endpointPolicyCache = new Map<string, PolicyData>();
|
||||
for (const policyId of policies) {
|
||||
if (!endpointPolicyCache.has(policyId)) {
|
||||
const agentPolicy = await this.receiver.fetchPolicyConfigs(policyId).catch((e) => {
|
||||
this.logger.l(`error fetching policy config due to ${e?.message}`);
|
||||
return null;
|
||||
});
|
||||
|
||||
const packagePolicies = agentPolicy?.package_policies;
|
||||
|
||||
if (packagePolicies !== undefined && isPackagePolicyList(packagePolicies)) {
|
||||
packagePolicies
|
||||
.map((pPolicy) => pPolicy as PolicyData)
|
||||
.forEach((pPolicy) => {
|
||||
if (pPolicy.inputs[0]?.config !== undefined && pPolicy.inputs[0]?.config !== null) {
|
||||
pPolicy.inputs.forEach((input) => {
|
||||
if (
|
||||
input.type === FLEET_ENDPOINT_PACKAGE &&
|
||||
input?.config !== undefined &&
|
||||
policyId !== undefined
|
||||
) {
|
||||
endpointPolicyCache.set(policyId, pPolicy);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
return endpointPolicyCache;
|
||||
}
|
||||
return endpointPolicyCache;
|
||||
}
|
||||
|
||||
function mapEndpointMetric(
|
||||
endpointMetric: EndpointMetricDocument,
|
||||
ctx: {
|
||||
policyIdByAgent: Map<string, string>;
|
||||
endpointPolicyById: Map<string, PolicyData>;
|
||||
policyResponses: Map<string, EndpointPolicyResponseDocument>;
|
||||
endpointMetadata: Map<string, EndpointMetadataDocument>;
|
||||
taskExecutionPeriod: TaskExecutionPeriod;
|
||||
clusterData: { clusterInfo: ESClusterInfo; licenseInfo: Nullable<ESLicense> };
|
||||
}
|
||||
) {
|
||||
let policyConfig = null;
|
||||
let failedPolicy: Nullable<EndpointPolicyResponseDocument> = null;
|
||||
let endpointMetadataById = null;
|
||||
|
||||
const fleetAgentId = endpointMetric.elastic.agent.id;
|
||||
const endpointAgentId = endpointMetric.agent.id;
|
||||
|
||||
const policyId = ctx.policyIdByAgent.get(fleetAgentId);
|
||||
if (policyId) {
|
||||
policyConfig = ctx.endpointPolicyById.get(policyId) || null;
|
||||
|
||||
if (policyConfig) {
|
||||
failedPolicy = ctx.policyResponses.get(endpointAgentId);
|
||||
private mapEndpointMetric(
|
||||
endpointMetric: EndpointMetricDocument,
|
||||
ctx: {
|
||||
policyIdByFleetAgentId: Map<string, string>;
|
||||
endpointPolicyById: Map<string, PolicyData>;
|
||||
policyResponses: Map<string, EndpointPolicyResponseDocument>;
|
||||
endpointMetadata: Map<string, EndpointMetadataDocument>;
|
||||
taskExecutionPeriod: TaskExecutionPeriod;
|
||||
clusterData: { clusterInfo: ESClusterInfo; licenseInfo: Nullable<ESLicense> };
|
||||
}
|
||||
}
|
||||
) {
|
||||
let policyConfig = null;
|
||||
let failedPolicy: Nullable<EndpointPolicyResponseDocument> = null;
|
||||
let endpointMetadataById = null;
|
||||
|
||||
if (ctx.endpointMetadata) {
|
||||
endpointMetadataById = ctx.endpointMetadata.get(endpointAgentId);
|
||||
}
|
||||
const fleetAgentId = endpointMetric.elastic.agent.id;
|
||||
const endpointAgentId = endpointMetric.agent.id;
|
||||
|
||||
const {
|
||||
cpu,
|
||||
memory,
|
||||
uptime,
|
||||
documents_volume: documentsVolume,
|
||||
malicious_behavior_rules: maliciousBehaviorRules,
|
||||
system_impact: systemImpact,
|
||||
threads,
|
||||
event_filter: eventFilter,
|
||||
} = endpointMetric.Endpoint.metrics;
|
||||
const endpointPolicyDetail = extractEndpointPolicyConfig(policyConfig);
|
||||
if (endpointPolicyDetail) {
|
||||
endpointPolicyDetail.value = addDefaultAdvancedPolicyConfigSettings(endpointPolicyDetail.value);
|
||||
}
|
||||
return {
|
||||
'@timestamp': ctx.taskExecutionPeriod.current,
|
||||
cluster_uuid: ctx.clusterData.clusterInfo.cluster_uuid,
|
||||
cluster_name: ctx.clusterData.clusterInfo.cluster_name,
|
||||
license_id: ctx.clusterData.licenseInfo?.uid,
|
||||
endpoint_id: endpointAgentId,
|
||||
endpoint_version: endpointMetric.agent.version,
|
||||
endpoint_package_version: policyConfig?.package?.version || null,
|
||||
endpoint_metrics: {
|
||||
cpu: cpu.endpoint,
|
||||
memory: memory.endpoint.private,
|
||||
const policyId = ctx.policyIdByFleetAgentId.get(fleetAgentId);
|
||||
if (policyId) {
|
||||
policyConfig = ctx.endpointPolicyById.get(policyId) || null;
|
||||
|
||||
if (policyConfig) {
|
||||
failedPolicy = ctx.policyResponses.get(endpointAgentId);
|
||||
}
|
||||
}
|
||||
|
||||
if (ctx.endpointMetadata) {
|
||||
endpointMetadataById = ctx.endpointMetadata.get(endpointAgentId);
|
||||
}
|
||||
|
||||
const {
|
||||
cpu,
|
||||
memory,
|
||||
uptime,
|
||||
documentsVolume,
|
||||
maliciousBehaviorRules,
|
||||
systemImpact,
|
||||
documents_volume: documentsVolume,
|
||||
malicious_behavior_rules: maliciousBehaviorRules,
|
||||
system_impact: systemImpact,
|
||||
threads,
|
||||
eventFilter,
|
||||
},
|
||||
endpoint_meta: {
|
||||
os: endpointMetric.host.os,
|
||||
capabilities:
|
||||
endpointMetadataById !== null && endpointMetadataById !== undefined
|
||||
? endpointMetadataById.Endpoint.capabilities
|
||||
: [],
|
||||
},
|
||||
policy_config: endpointPolicyDetail !== null ? endpointPolicyDetail : {},
|
||||
policy_response:
|
||||
failedPolicy !== null && failedPolicy !== undefined
|
||||
? {
|
||||
agent_policy_status: failedPolicy.event.agent_id_status,
|
||||
manifest_version: failedPolicy.Endpoint.policy.applied.artifacts.global.version,
|
||||
status: failedPolicy.Endpoint.policy.applied.status,
|
||||
actions: failedPolicy.Endpoint.policy.applied.actions
|
||||
.map((action) => (action.status !== 'success' ? action : null))
|
||||
.filter((action) => action !== null),
|
||||
configuration: failedPolicy.Endpoint.configuration,
|
||||
state: failedPolicy.Endpoint.state,
|
||||
}
|
||||
: {},
|
||||
telemetry_meta: {
|
||||
metrics_timestamp: endpointMetric['@timestamp'],
|
||||
},
|
||||
};
|
||||
event_filter: eventFilter,
|
||||
} = endpointMetric.Endpoint.metrics;
|
||||
const endpointPolicyDetail = extractEndpointPolicyConfig(policyConfig);
|
||||
if (endpointPolicyDetail) {
|
||||
endpointPolicyDetail.value = addDefaultAdvancedPolicyConfigSettings(
|
||||
endpointPolicyDetail.value
|
||||
);
|
||||
}
|
||||
return {
|
||||
'@timestamp': ctx.taskExecutionPeriod.current,
|
||||
cluster_uuid: ctx.clusterData.clusterInfo.cluster_uuid,
|
||||
cluster_name: ctx.clusterData.clusterInfo.cluster_name,
|
||||
license_id: ctx.clusterData.licenseInfo?.uid,
|
||||
endpoint_id: endpointAgentId,
|
||||
endpoint_version: endpointMetric.agent.version,
|
||||
endpoint_package_version: policyConfig?.package?.version || null,
|
||||
endpoint_metrics: {
|
||||
cpu: cpu.endpoint,
|
||||
memory: memory.endpoint.private,
|
||||
uptime,
|
||||
documentsVolume,
|
||||
maliciousBehaviorRules,
|
||||
systemImpact,
|
||||
threads,
|
||||
eventFilter,
|
||||
},
|
||||
endpoint_meta: {
|
||||
os: endpointMetric.host.os,
|
||||
capabilities:
|
||||
endpointMetadataById !== null && endpointMetadataById !== undefined
|
||||
? endpointMetadataById.Endpoint.capabilities
|
||||
: [],
|
||||
},
|
||||
policy_config: endpointPolicyDetail !== null ? endpointPolicyDetail : {},
|
||||
policy_response:
|
||||
failedPolicy !== null && failedPolicy !== undefined
|
||||
? {
|
||||
agent_policy_status: failedPolicy.event.agent_id_status,
|
||||
manifest_version: failedPolicy.Endpoint.policy.applied.artifacts.global.version,
|
||||
status: failedPolicy.Endpoint.policy.applied.status,
|
||||
actions: failedPolicy.Endpoint.policy.applied.actions
|
||||
.map((action) => (action.status !== 'success' ? action : null))
|
||||
.filter((action) => action !== null),
|
||||
configuration: failedPolicy.Endpoint.configuration,
|
||||
state: failedPolicy.Endpoint.state,
|
||||
}
|
||||
: {},
|
||||
telemetry_meta: {
|
||||
metrics_timestamp: endpointMetric['@timestamp'],
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,10 @@ export interface TelemetryLogger extends Logger {
|
|||
* It makes easier to browse the logs by filtering by the structured argument `logger`.
|
||||
*/
|
||||
export class TelemetryLoggerImpl implements TelemetryLogger {
|
||||
constructor(private readonly delegate: Logger) {}
|
||||
constructor(
|
||||
private readonly delegate: Logger,
|
||||
private readonly mdc?: LogMeta | object | undefined
|
||||
) {}
|
||||
|
||||
l<Meta extends LogMeta = LogMeta>(message: string, meta?: Meta | object | undefined): void {
|
||||
if (isElasticCloudDeployment) {
|
||||
|
@ -44,27 +47,27 @@ export class TelemetryLoggerImpl implements TelemetryLogger {
|
|||
}
|
||||
|
||||
trace<Meta extends LogMeta = LogMeta>(message: string, meta?: Meta): void {
|
||||
this.delegate.trace(message, logMeta(meta));
|
||||
this.delegate.trace(message, logMeta(meta, this.mdc));
|
||||
}
|
||||
|
||||
debug<Meta extends LogMeta = LogMeta>(message: string, meta?: Meta): void {
|
||||
this.delegate.debug(message, logMeta(meta));
|
||||
this.delegate.debug(message, logMeta(meta, this.mdc));
|
||||
}
|
||||
|
||||
info<Meta extends LogMeta = LogMeta>(message: string, meta?: Meta): void {
|
||||
this.delegate.info(message, logMeta(meta));
|
||||
this.delegate.info(message, logMeta(meta, this.mdc));
|
||||
}
|
||||
|
||||
warn<Meta extends LogMeta = LogMeta>(errorOrMessage: string | Error, meta?: Meta): void {
|
||||
this.delegate.warn(errorOrMessage, logMeta(meta));
|
||||
this.delegate.warn(errorOrMessage, logMeta(meta, this.mdc));
|
||||
}
|
||||
|
||||
error<Meta extends LogMeta = LogMeta>(errorOrMessage: string | Error, meta?: Meta): void {
|
||||
this.delegate.error(errorOrMessage, logMeta(meta));
|
||||
this.delegate.error(errorOrMessage, logMeta(meta, this.mdc));
|
||||
}
|
||||
|
||||
fatal<Meta extends LogMeta = LogMeta>(errorOrMessage: string | Error, meta?: Meta): void {
|
||||
this.delegate.fatal(errorOrMessage, logMeta(meta));
|
||||
this.delegate.fatal(errorOrMessage, logMeta(meta, this.mdc));
|
||||
}
|
||||
|
||||
log(record: LogRecord): void {
|
||||
|
@ -76,7 +79,8 @@ export class TelemetryLoggerImpl implements TelemetryLogger {
|
|||
}
|
||||
|
||||
get(...childContextPaths: string[]): Logger {
|
||||
return this.delegate.get(...childContextPaths);
|
||||
const logger = this.delegate.get(...childContextPaths);
|
||||
return new TelemetryLoggerImpl(logger, this.mdc);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -89,7 +93,7 @@ export const tlog = (logger: Logger, message: string, meta?: LogMeta) => {
|
|||
};
|
||||
|
||||
// helper method to merge a given LogMeta with the cluster info (if exists)
|
||||
function logMeta(meta?: LogMeta | undefined): LogMeta {
|
||||
function logMeta(meta?: LogMeta | undefined, mdc?: LogMeta | undefined): LogMeta {
|
||||
const clusterInfoMeta = clusterInfo
|
||||
? {
|
||||
cluster_uuid: clusterInfo?.cluster_uuid,
|
||||
|
@ -99,5 +103,6 @@ function logMeta(meta?: LogMeta | undefined): LogMeta {
|
|||
return {
|
||||
...clusterInfoMeta,
|
||||
...(meta ?? {}),
|
||||
...(mdc ?? {}),
|
||||
};
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue