[8.17] [8.x] [Telemetry][Security Solution] Index metadata collector (#194004) (#204311) (#204628)

# Backport

This will backport the following commits from `8.x` to `8.17`:
- [[8.x] [Telemetry][Security Solution] Index metadata collector
(#194004) (#204311)](https://github.com/elastic/kibana/pull/204311)

<!--- Backport version: 8.9.8 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Sebastián
Zaffarano","email":"sebastian.zaffarano@elastic.co"},"sourceCommit":{"committedDate":"2024-12-16T16:05:02Z","message":"[8.x]
[Telemetry][Security Solution] Index metadata collector (#194004)
(#204311)\n\n# Backport\n\nThis will backport the following commits from
`main` to `8.x`:\n- [[Telemetry][Security Solution] Index metadata
collector\n(#194004)](https://github.com/elastic/kibana/pull/194004)\n\n<!---
Backport version: 8.9.8 -->\n\n### Questions ?\nPlease refer to the
[Backport
tool\ndocumentation](https://github.com/sqren/backport)\n\n<!--BACKPORT
[{\"author\":{\"name\":\"Sebastián\nZaffarano\",\"email\":\"sebastian.zaffarano@elastic.co\"},\"sourceCommit\":{\"committedDate\":\"2024-12-13T18:31:03Z\",\"message\":\"[Telemetry][Security\nSolution]
Index metadata collector
(#194004)\\n\\n##\nSummary\\r\\n\\r\\nImplements a security_solution
task scheduled to run once\na day to\\r\\ncollect the following
information:\\r\\n\\r\\n1. Datastreams\nstats\\r\\n2. Indices
stats\\r\\n3. ILMs stats\\r\\n4. ILM configs\\r\\n\\r\\nThe\ntask allows
a runtime configuration to limit the number of\nindices\\r\\nand data
streams to analyze or event to disable the
feature\nentirely.\\r\\n\\r\\nOnce the data is gathered, the task sends
it as EBT\nevents.\\r\\n\\r\\n---------\\r\\n\\r\\nCo-authored-by:
kibanamachine\n<42973632+kibanamachine@users.noreply.github.com>\\r\\nCo-authored-by:\nElastic
Machine\n<elasticmachine@users.noreply.github.com>\",\"sha\":\"36b344a4c58a3d78a892288e0eef71e9ff163b9d\",\"branchLabelMapping\":{\"^v9.0.0$\":\"main\",\"^v8.18.0$\":\"8.x\",\"^v(\\\\d+).(\\\\d+).\\\\d+$\":\"$1.$2\"}},\"sourcePullRequest\":{\"labels\":[\"release_note:skip\",\"v9.0.0\",\"Team:\nSecuritySolution\",\"backport:all-open\",\"ci:cloud-deploy\"],\"number\":194004,\"url\":\"https://github.com/elastic/kibana/pull/194004\",\"mergeCommit\":{\"message\":\"[Telemetry][Security\nSolution]
Index metadata collector
(#194004)\\n\\n##\nSummary\\r\\n\\r\\nImplements a security_solution
task scheduled to run once\na day to\\r\\ncollect the following
information:\\r\\n\\r\\n1. Datastreams\nstats\\r\\n2. Indices
stats\\r\\n3. ILMs stats\\r\\n4. ILM configs\\r\\n\\r\\nThe\ntask allows
a runtime configuration to limit the number of\nindices\\r\\nand data
streams to analyze or event to disable the
feature\nentirely.\\r\\n\\r\\nOnce the data is gathered, the task sends
it as EBT\nevents.\\r\\n\\r\\n---------\\r\\n\\r\\nCo-authored-by:
kibanamachine\n<42973632+kibanamachine@users.noreply.github.com>\\r\\nCo-authored-by:\nElastic
Machine\n<elasticmachine@users.noreply.github.com>\",\"sha\":\"36b344a4c58a3d78a892288e0eef71e9ff163b9d\"}},\"sourceBranch\":\"main\",\"suggestedTargetBranches\":[],\"targetPullRequestStates\":[{\"branch\":\"main\",\"label\":\"v9.0.0\",\"labelRegex\":\"^v9.0.0$\",\"isSourceBranch\":true,\"state\":\"MERGED\",\"url\":\"https://github.com/elastic/kibana/pull/194004\",\"number\":194004,\"mergeCommit\":{\"message\":\"[Telemetry][Security\nSolution]
Index metadata collector
(#194004)\\n\\n##\nSummary\\r\\n\\r\\nImplements a security_solution
task scheduled to run once\na day to\\r\\ncollect the following
information:\\r\\n\\r\\n1. Datastreams\nstats\\r\\n2. Indices
stats\\r\\n3. ILMs stats\\r\\n4. ILM configs\\r\\n\\r\\nThe\ntask allows
a runtime configuration to limit the number of\nindices\\r\\nand data
streams to analyze or event to disable the
feature\nentirely.\\r\\n\\r\\nOnce the data is gathered, the task sends
it as EBT\nevents.\\r\\n\\r\\n---------\\r\\n\\r\\nCo-authored-by:
kibanamachine\n<42973632+kibanamachine@users.noreply.github.com>\\r\\nCo-authored-by:\nElastic
Machine\n<elasticmachine@users.noreply.github.com>\",\"sha\":\"36b344a4c58a3d78a892288e0eef71e9ff163b9d\"}}]}]\nBACKPORT-->\n\nCo-authored-by:
Elastic Machine
<elasticmachine@users.noreply.github.com>","sha":"7a3de72bbe94a87bd972602b29212764f2619951","branchLabelMapping":{"^v8.16.0$":"main","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["backport"],"number":204311,"url":"https://github.com/elastic/kibana/pull/204311","mergeCommit":{"message":"[8.x]
[Telemetry][Security Solution] Index metadata collector (#194004)
(#204311)\n\n# Backport\n\nThis will backport the following commits from
`main` to `8.x`:\n- [[Telemetry][Security Solution] Index metadata
collector\n(#194004)](https://github.com/elastic/kibana/pull/194004)\n\n<!---
Backport version: 8.9.8 -->\n\n### Questions ?\nPlease refer to the
[Backport
tool\ndocumentation](https://github.com/sqren/backport)\n\n<!--BACKPORT
[{\"author\":{\"name\":\"Sebastián\nZaffarano\",\"email\":\"sebastian.zaffarano@elastic.co\"},\"sourceCommit\":{\"committedDate\":\"2024-12-13T18:31:03Z\",\"message\":\"[Telemetry][Security\nSolution]
Index metadata collector
(#194004)\\n\\n##\nSummary\\r\\n\\r\\nImplements a security_solution
task scheduled to run once\na day to\\r\\ncollect the following
information:\\r\\n\\r\\n1. Datastreams\nstats\\r\\n2. Indices
stats\\r\\n3. ILMs stats\\r\\n4. ILM configs\\r\\n\\r\\nThe\ntask allows
a runtime configuration to limit the number of\nindices\\r\\nand data
streams to analyze or event to disable the
feature\nentirely.\\r\\n\\r\\nOnce the data is gathered, the task sends
it as EBT\nevents.\\r\\n\\r\\n---------\\r\\n\\r\\nCo-authored-by:
kibanamachine\n<42973632+kibanamachine@users.noreply.github.com>\\r\\nCo-authored-by:\nElastic
Machine\n<elasticmachine@users.noreply.github.com>\",\"sha\":\"36b344a4c58a3d78a892288e0eef71e9ff163b9d\",\"branchLabelMapping\":{\"^v9.0.0$\":\"main\",\"^v8.18.0$\":\"8.x\",\"^v(\\\\d+).(\\\\d+).\\\\d+$\":\"$1.$2\"}},\"sourcePullRequest\":{\"labels\":[\"release_note:skip\",\"v9.0.0\",\"Team:\nSecuritySolution\",\"backport:all-open\",\"ci:cloud-deploy\"],\"number\":194004,\"url\":\"https://github.com/elastic/kibana/pull/194004\",\"mergeCommit\":{\"message\":\"[Telemetry][Security\nSolution]
Index metadata collector
(#194004)\\n\\n##\nSummary\\r\\n\\r\\nImplements a security_solution
task scheduled to run once\na day to\\r\\ncollect the following
information:\\r\\n\\r\\n1. Datastreams\nstats\\r\\n2. Indices
stats\\r\\n3. ILMs stats\\r\\n4. ILM configs\\r\\n\\r\\nThe\ntask allows
a runtime configuration to limit the number of\nindices\\r\\nand data
streams to analyze or event to disable the
feature\nentirely.\\r\\n\\r\\nOnce the data is gathered, the task sends
it as EBT\nevents.\\r\\n\\r\\n---------\\r\\n\\r\\nCo-authored-by:
kibanamachine\n<42973632+kibanamachine@users.noreply.github.com>\\r\\nCo-authored-by:\nElastic
Machine\n<elasticmachine@users.noreply.github.com>\",\"sha\":\"36b344a4c58a3d78a892288e0eef71e9ff163b9d\"}},\"sourceBranch\":\"main\",\"suggestedTargetBranches\":[],\"targetPullRequestStates\":[{\"branch\":\"main\",\"label\":\"v9.0.0\",\"labelRegex\":\"^v9.0.0$\",\"isSourceBranch\":true,\"state\":\"MERGED\",\"url\":\"https://github.com/elastic/kibana/pull/194004\",\"number\":194004,\"mergeCommit\":{\"message\":\"[Telemetry][Security\nSolution]
Index metadata collector
(#194004)\\n\\n##\nSummary\\r\\n\\r\\nImplements a security_solution
task scheduled to run once\na day to\\r\\ncollect the following
information:\\r\\n\\r\\n1. Datastreams\nstats\\r\\n2. Indices
stats\\r\\n3. ILMs stats\\r\\n4. ILM configs\\r\\n\\r\\nThe\ntask allows
a runtime configuration to limit the number of\nindices\\r\\nand data
streams to analyze or event to disable the
feature\nentirely.\\r\\n\\r\\nOnce the data is gathered, the task sends
it as EBT\nevents.\\r\\n\\r\\n---------\\r\\n\\r\\nCo-authored-by:
kibanamachine\n<42973632+kibanamachine@users.noreply.github.com>\\r\\nCo-authored-by:\nElastic
Machine\n<elasticmachine@users.noreply.github.com>\",\"sha\":\"36b344a4c58a3d78a892288e0eef71e9ff163b9d\"}}]}]\nBACKPORT-->\n\nCo-authored-by:
Elastic Machine
<elasticmachine@users.noreply.github.com>","sha":"7a3de72bbe94a87bd972602b29212764f2619951"}},"sourceBranch":"8.x","suggestedTargetBranches":[],"targetPullRequestStates":[]}]
BACKPORT-->

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Sebastián Zaffarano 2025-01-09 16:27:12 -03:00 committed by GitHub
parent 6dd7925c29
commit 81fd45e12a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
43 changed files with 3100 additions and 136 deletions

View file

@ -82,6 +82,7 @@ disabled:
- x-pack/test/security_solution_api_integration/test_suites/detections_response/rules_management/rule_read/trial_license_complete_tier/configs/serverless.config.ts
- x-pack/test/security_solution_api_integration/test_suites/detections_response/rules_management/rule_read/basic_license_essentials_tier/configs/serverless.config.ts
- x-pack/test/security_solution_api_integration/test_suites/detections_response/telemetry/trial_license_complete_tier/configs/serverless.config.ts
- x-pack/test/security_solution_api_integration/test_suites/telemetry/configs/serverless.config.ts
- x-pack/test/security_solution_api_integration/test_suites/detections_response/user_roles/trial_license_complete_tier/configs/serverless.config.ts
- x-pack/test/security_solution_api_integration/test_suites/genai/nlp_cleanup_task/trial_license_complete_tier/configs/serverless.config.ts
- x-pack/test/security_solution_api_integration/test_suites/genai/nlp_cleanup_task/basic_license_essentials_tier/configs/serverless.config.ts

View file

@ -70,6 +70,7 @@ enabled:
- x-pack/test/security_solution_api_integration/test_suites/detections_response/rules_management/rule_read/trial_license_complete_tier/configs/ess.config.ts
- x-pack/test/security_solution_api_integration/test_suites/detections_response/rules_management/rule_read/basic_license_essentials_tier/configs/ess.config.ts
- x-pack/test/security_solution_api_integration/test_suites/detections_response/telemetry/trial_license_complete_tier/configs/ess.config.ts
- x-pack/test/security_solution_api_integration/test_suites/telemetry/configs/ess.config.ts
- x-pack/test/security_solution_api_integration/test_suites/detections_response/user_roles/trial_license_complete_tier/configs/ess.config.ts
- x-pack/test/security_solution_api_integration/test_suites/entity_analytics/risk_engine/trial_license_complete_tier/configs/ess.config.ts
- x-pack/test/security_solution_api_integration/test_suites/entity_analytics/risk_engine/basic_license_essentials_tier/configs/ess.config.ts

View file

@ -0,0 +1,152 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import Path from 'path';
import axios from 'axios';
import { cloneDeep } from 'lodash';
import { telemetryConfiguration } from '../lib/telemetry/configuration';
import {
TaskManagerPlugin,
type TaskManagerStartContract,
} from '@kbn/task-manager-plugin/server/plugin';
import {
setupTestServers,
removeFile,
mockAxiosPost,
DEFAULT_GET_ROUTES,
mockAxiosGet,
getRandomInt,
} from './lib/helpers';
import {
type TestElasticsearchUtils,
type TestKibanaUtils,
} from '@kbn/core-test-helpers-kbn-server';
import { Plugin as SecuritySolutionPlugin } from '../plugin';
import { getTelemetryTasks, runSoonConfigTask } from './lib/telemetry_helpers';
import type { SecurityTelemetryTask } from '../lib/telemetry/task';
jest.mock('axios');
const logFilePath = Path.join(__dirname, 'config.logs.log');
const taskManagerStartSpy = jest.spyOn(TaskManagerPlugin.prototype, 'start');
const securitySolutionStartSpy = jest.spyOn(SecuritySolutionPlugin.prototype, 'start');
const mockedAxiosGet = jest.spyOn(axios, 'get');
const mockedAxiosPost = jest.spyOn(axios, 'post');
const securitySolutionPlugin = jest.spyOn(SecuritySolutionPlugin.prototype, 'start');
describe('configuration', () => {
let esServer: TestElasticsearchUtils;
let kibanaServer: TestKibanaUtils;
let taskManagerPlugin: TaskManagerStartContract;
let tasks: SecurityTelemetryTask[];
beforeAll(async () => {
await removeFile(logFilePath);
const servers = await setupTestServers(logFilePath);
esServer = servers.esServer;
kibanaServer = servers.kibanaServer;
expect(taskManagerStartSpy).toHaveBeenCalledTimes(1);
taskManagerPlugin = taskManagerStartSpy.mock.results[0].value;
expect(securitySolutionStartSpy).toHaveBeenCalledTimes(1);
tasks = getTelemetryTasks(securitySolutionStartSpy);
expect(securitySolutionPlugin).toHaveBeenCalledTimes(1);
});
afterAll(async () => {
if (kibanaServer) {
await kibanaServer.stop();
}
if (esServer) {
await esServer.stop();
}
});
beforeEach(async () => {
jest.clearAllMocks();
mockAxiosPost(mockedAxiosPost);
});
afterEach(async () => {});
describe('configuration task', () => {
it('should keep default values when no new config was provided', async () => {
const before = cloneDeep(telemetryConfiguration);
await runSoonConfigTask(tasks, taskManagerPlugin);
expect(telemetryConfiguration).toEqual(before);
});
it('should update values with new manifest', async () => {
const expected = {
telemetry_max_buffer_size: getRandomInt(1, 100),
max_security_list_telemetry_batch: getRandomInt(1, 100),
max_endpoint_telemetry_batch: getRandomInt(1, 100),
max_detection_rule_telemetry_batch: getRandomInt(1, 100),
max_detection_alerts_batch: getRandomInt(1, 100),
use_async_sender: true,
pagination_config: {
max_page_size_bytes: getRandomInt(1, 100),
num_docs_to_sample: getRandomInt(1, 100),
},
sender_channels: {
default: {
buffer_time_span_millis: getRandomInt(1, 100),
inflight_events_threshold: getRandomInt(1, 100),
max_payload_size_bytes: getRandomInt(1, 100),
},
},
indices_metadata_config: {
indices_threshold: getRandomInt(1, 100),
datastreams_threshold: getRandomInt(1, 100),
max_prefixes: getRandomInt(1, 100),
max_group_size: getRandomInt(1, 100),
},
};
mockAxiosGet(mockedAxiosGet, [
...DEFAULT_GET_ROUTES,
[/.*telemetry-buffer-and-batch-sizes-v1.*/, { status: 200, data: cloneDeep(expected) }],
]);
await runSoonConfigTask(tasks, taskManagerPlugin);
expect(telemetryConfiguration.telemetry_max_buffer_size).toEqual(
expected.telemetry_max_buffer_size
);
expect(telemetryConfiguration.max_security_list_telemetry_batch).toEqual(
expected.max_security_list_telemetry_batch
);
expect(telemetryConfiguration.max_endpoint_telemetry_batch).toEqual(
expected.max_endpoint_telemetry_batch
);
expect(telemetryConfiguration.max_detection_rule_telemetry_batch).toEqual(
expected.max_detection_rule_telemetry_batch
);
expect(telemetryConfiguration.max_detection_alerts_batch).toEqual(
expected.max_detection_alerts_batch
);
expect(telemetryConfiguration.use_async_sender).toEqual(expected.use_async_sender);
expect(telemetryConfiguration.sender_channels).toEqual(expected.sender_channels);
expect(telemetryConfiguration.pagination_config).toEqual(expected.pagination_config);
expect(telemetryConfiguration.indices_metadata_config).toEqual(
expected.indices_metadata_config
);
});
});
});

View file

@ -10,6 +10,20 @@ import Util from 'util';
import type { ElasticsearchClient } from '@kbn/core/server';
import deepmerge from 'deepmerge';
import { createTestServers, createRootWithCorePlugins } from '@kbn/core-test-helpers-kbn-server';
export const DEFAULT_GET_ROUTES: Array<[RegExp, unknown]> = [
[new RegExp('.*/ping$'), { status: 200 }],
[
/.*kibana\/manifest\/artifacts.*/,
{
status: 200,
data: 'x-pack/plugins/security_solution/server/lib/telemetry/__mocks__/kibana-artifacts.zip',
},
],
];
export const DEFAULT_POST_ROUTES: Array<[RegExp, unknown]> = [[/.*/, { status: 200 }]];
const asyncUnlink = Util.promisify(Fs.unlink);
/**
@ -127,3 +141,35 @@ export function updateTimestamps(data: object[]): object[] {
return { ...d, '@timestamp': new Date(currentTimeMillis + (i + 1) * 100) };
});
}
export function mockAxiosPost(
postSpy: jest.SpyInstance,
routes: Array<[RegExp, unknown]> = DEFAULT_POST_ROUTES
) {
postSpy.mockImplementation(async (url: string) => {
for (const [route, returnValue] of routes) {
if (route.test(url)) {
return returnValue;
}
}
return { status: 404 };
});
}
export function mockAxiosGet(
getSpy: jest.SpyInstance,
routes: Array<[RegExp, unknown]> = DEFAULT_GET_ROUTES
) {
getSpy.mockImplementation(async (url: string) => {
for (const [route, returnValue] of routes) {
if (route.test(url)) {
return returnValue;
}
}
return { status: 404 };
});
}
export function getRandomInt(min: number, max: number): number {
return Math.floor(Math.random() * (max - min + 1)) + min;
}

View file

@ -24,12 +24,13 @@ import {
deleteExceptionListItem,
} from '@kbn/lists-plugin/server/services/exception_lists';
import { LEGACY_AGENT_POLICY_SAVED_OBJECT_TYPE } from '@kbn/fleet-plugin/common/constants';
import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server';
import { packagePolicyService } from '@kbn/fleet-plugin/server/services';
import { ENDPOINT_ARTIFACT_LISTS } from '@kbn/securitysolution-list-constants';
import { DETECTION_TYPE, NAMESPACE_TYPE } from '@kbn/lists-plugin/common/constants.mock';
import { bulkInsert, updateTimestamps } from './helpers';
import { bulkInsert, eventually, updateTimestamps } from './helpers';
import { TelemetryEventsSender } from '../../lib/telemetry/sender';
import type {
SecuritySolutionPluginStart,
@ -397,3 +398,24 @@ export function getTelemetryTaskType(task: SecurityTelemetryTask): string {
return '';
}
}
export async function runSoonConfigTask(
tasks: SecurityTelemetryTask[],
taskManagerPlugin: TaskManagerStartContract
) {
const configTaskType = 'security:telemetry-configuration';
const configTask = getTelemetryTask(tasks, configTaskType);
const runAfter = new Date();
await eventually(async () => {
await taskManagerPlugin.runSoon(configTask.getTaskId());
});
// wait until the task finishes
await eventually(async () => {
const hasRun = await taskManagerPlugin
.get(configTask.getTaskId())
.then((t) => new Date(t.state.lastExecutionTimestamp) > runAfter)
.catch(() => false);
expect(hasRun).toBe(true);
});
}

View file

@ -20,7 +20,14 @@ import {
TELEMETRY_CHANNEL_ENDPOINT_META,
} from '../lib/telemetry/constants';
import { eventually, setupTestServers, removeFile } from './lib/helpers';
import {
eventually,
setupTestServers,
removeFile,
mockAxiosGet,
mockAxiosPost,
DEFAULT_GET_ROUTES,
} from './lib/helpers';
import {
cleanupMockedAlerts,
cleanupMockedExceptionLists,
@ -37,6 +44,7 @@ import {
mockEndpointData,
getTelemetryReceiver,
mockPrebuiltRulesData,
runSoonConfigTask,
} from './lib/telemetry_helpers';
import {
@ -85,7 +93,21 @@ describe('telemetry tasks', () => {
beforeAll(async () => {
await removeFile(logFilePath);
const servers = await setupTestServers(logFilePath);
const servers = await setupTestServers(logFilePath, {
xpack: {
fleet: {
internal: {
registry: {
// Since `endpoint` is not available in EPR yet for
// kibana 9 (e.g., https://epr.elastic.co/search?package=endpoint&kibana.version=9.0.0)
// we need to ignore version checks
kibanaVersionCheckEnabled: false,
},
},
},
},
});
esServer = servers.esServer;
kibanaServer = servers.kibanaServer;
@ -121,7 +143,17 @@ describe('telemetry tasks', () => {
beforeEach(async () => {
jest.clearAllMocks();
mockAxiosGet();
mockAxiosPost(mockedAxiosPost);
mockAxiosGet(mockedAxiosGet, [
...DEFAULT_GET_ROUTES,
[
/.*telemetry-buffer-and-batch-sizes-v1.*/,
{
status: 200,
data: fakeBufferAndSizesConfigAsyncDisabled,
},
],
]);
deferred = [];
});
@ -195,21 +227,17 @@ describe('telemetry tasks', () => {
});
it('should use new sender when configured', async () => {
const configTaskType = 'security:telemetry-configuration';
const configTask = getTelemetryTask(tasks, configTaskType);
mockAxiosPost(mockedAxiosPost);
mockAxiosGet(fakeBufferAndSizesConfigAsyncEnabled);
await eventually(async () => {
await taskManagerPlugin.runSoon(configTask.getTaskId());
});
mockAxiosGet(mockedAxiosGet, [
...DEFAULT_GET_ROUTES,
[
/.*telemetry-buffer-and-batch-sizes-v1.*/,
{ status: 200, data: fakeBufferAndSizesConfigAsyncEnabled },
],
]);
// wait until the task finishes
await eventually(async () => {
const found = (await taskManagerPlugin.fetch()).docs.find(
(t) => t.taskType === configTaskType
);
expect(found).toBeFalsy();
});
await runSoonConfigTask(tasks, taskManagerPlugin);
const [task, started] = await mockAndScheduleDetectionRulesTask();
@ -225,13 +253,20 @@ describe('telemetry tasks', () => {
it('should update sender queue config', async () => {
const expectedConfig = fakeBufferAndSizesConfigWithQueues.sender_channels['task-metrics'];
const configTaskType = 'security:telemetry-configuration';
const configTask = getTelemetryTask(tasks, configTaskType);
mockAxiosGet(fakeBufferAndSizesConfigWithQueues);
await eventually(async () => {
await taskManagerPlugin.runSoon(configTask.getTaskId());
});
mockAxiosPost(mockedAxiosPost);
mockAxiosGet(mockedAxiosGet, [
...DEFAULT_GET_ROUTES,
[
/.*telemetry-buffer-and-batch-sizes-v1.*/,
{
status: 200,
data: fakeBufferAndSizesConfigWithQueues,
},
],
]);
await runSoonConfigTask(tasks, taskManagerPlugin);
await eventually(async () => {
/* eslint-disable dot-notation */
@ -825,31 +860,6 @@ describe('telemetry tasks', () => {
});
}
function mockAxiosGet(bufferConfig: unknown = fakeBufferAndSizesConfigAsyncDisabled) {
mockedAxiosPost.mockImplementation(
async (_url: string, _data?: unknown, _config?: AxiosRequestConfig<unknown> | undefined) => {
return { status: 200 };
}
);
mockedAxiosGet.mockImplementation(async (url: string) => {
if (url.startsWith(ENDPOINT_STAGING) && url.endsWith('ping')) {
return { status: 200 };
} else if (url.indexOf('kibana/manifest/artifacts') !== -1) {
return {
status: 200,
data: 'x-pack/plugins/security_solution/server/lib/telemetry/__mocks__/kibana-artifacts.zip',
};
} else if (url.indexOf('telemetry-buffer-and-batch-sizes-v1') !== -1) {
return {
status: 200,
data: bufferConfig,
};
}
return { status: 404 };
});
}
async function getTaskMetricsRequests(
task: SecurityTelemetryTask,
olderThan: number

View file

@ -15,6 +15,7 @@ import { getDetectionRulesPreview } from './utils/get_detecton_rules_preview';
import { getSecurityListsPreview } from './utils/get_security_lists_preview';
import { getEndpointPreview } from './utils/get_endpoint_preview';
import { getDiagnosticsPreview } from './utils/get_diagnostics_preview';
import { getIndicesMetadataPreview } from './utils/get_indices_metadata_preview';
export const telemetryDetectionRulesPreviewRoute = (
router: SecuritySolutionPluginRouter,
@ -62,12 +63,19 @@ export const telemetryDetectionRulesPreviewRoute = (
telemetrySender,
});
const indicesMetadata = await getIndicesMetadataPreview({
logger,
telemetryReceiver,
telemetrySender,
});
return response.ok({
body: {
detection_rules: detectionRules,
security_lists: securityLists,
endpoints,
diagnostics,
indices_metadata: indicesMetadata,
},
});
}

View file

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Logger } from '@kbn/core/server';
import { PreviewTelemetryEventsSender } from '../../../../telemetry/preview_sender';
import type { ITelemetryReceiver } from '../../../../telemetry/receiver';
import { PreviewTaskMetricsService } from '../../../../telemetry/preview_task_metrics';
import type { ITelemetryEventsSender } from '../../../../telemetry/sender';
import { createTelemetryIndicesMetadataTaskConfig } from '../../../../telemetry/tasks/indices.metadata';
export const getIndicesMetadataPreview = async ({
logger,
telemetryReceiver,
telemetrySender,
}: {
logger: Logger;
telemetryReceiver: ITelemetryReceiver;
telemetrySender: ITelemetryEventsSender;
}): Promise<Array<{ eventType: string; eventData: object }>> => {
const taskExecutionPeriod = {
last: new Date(0).toISOString(),
current: new Date().toISOString(),
};
const taskSender = new PreviewTelemetryEventsSender(logger, telemetrySender);
const taskMetricsService = new PreviewTaskMetricsService(logger, taskSender);
const task = createTelemetryIndicesMetadataTaskConfig();
await task.runTask(
'indices-metadata-telemetry',
logger,
telemetryReceiver,
taskSender,
taskMetricsService,
taskExecutionPeriod
);
return taskSender.getEbtEventsSent();
};

View file

@ -8,7 +8,7 @@ import axios from 'axios';
import * as rx from 'rxjs';
import _, { cloneDeep } from 'lodash';
import type { Logger, LogMeta } from '@kbn/core/server';
import type { AnalyticsServiceSetup, EventTypeOpts, Logger, LogMeta } from '@kbn/core/server';
import type { TelemetryPluginSetup, TelemetryPluginStart } from '@kbn/telemetry-plugin/server';
import { type IUsageCounter } from '@kbn/usage-collection-plugin/server/usage_counters/usage_counter';
import type { ITelemetryReceiver } from './receiver';
@ -17,7 +17,7 @@ import {
type QueueConfig,
type RetryConfig,
} from './async_sender.types';
import { TelemetryChannel, TelemetryCounter } from './types';
import { type Nullable, TelemetryChannel, TelemetryCounter } from './types';
import * as collections from './collections_helpers';
import { CachedSubject, retryOnError$ } from './rxjs_helpers';
import { SenderUtils } from './sender_helpers';
@ -55,6 +55,8 @@ export class AsyncTelemetryEventsSender implements IAsyncTelemetryEventsSender {
private telemetryUsageCounter?: IUsageCounter;
private senderUtils: SenderUtils | undefined;
private analytics: Nullable<AnalyticsServiceSetup>;
constructor(logger: Logger) {
this.logger = newTelemetryLogger(logger.get('telemetry_events.async_sender'));
}
@ -64,7 +66,8 @@ export class AsyncTelemetryEventsSender implements IAsyncTelemetryEventsSender {
fallbackQueueConfig: QueueConfig,
telemetryReceiver: ITelemetryReceiver,
telemetrySetup?: TelemetryPluginSetup,
telemetryUsageCounter?: IUsageCounter
telemetryUsageCounter?: IUsageCounter,
analytics?: AnalyticsServiceSetup
): void {
this.logger.l(`Setting up ${AsyncTelemetryEventsSender.name}`);
@ -77,6 +80,7 @@ export class AsyncTelemetryEventsSender implements IAsyncTelemetryEventsSender {
this.telemetryReceiver = telemetryReceiver;
this.telemetrySetup = telemetrySetup;
this.telemetryUsageCounter = telemetryUsageCounter;
this.analytics = analytics;
this.updateStatus(ServiceStatus.CONFIGURED);
}
@ -201,6 +205,13 @@ export class AsyncTelemetryEventsSender implements IAsyncTelemetryEventsSender {
}
}
public reportEBT<T>(eventTypeOpts: EventTypeOpts<T>, eventData: T): void {
if (!this.analytics) {
throw Error('analytics is unavailable');
}
this.analytics.reportEvent(eventTypeOpts.eventType, eventData as object);
}
// internal methods
private queue$(
upstream$: rx.Observable<Event>,

View file

@ -6,6 +6,7 @@
*/
import type { TelemetryPluginSetup, TelemetryPluginStart } from '@kbn/telemetry-plugin/server';
import { type IUsageCounter } from '@kbn/usage-collection-plugin/server/usage_counters/usage_counter';
import type { AnalyticsServiceSetup, EventTypeOpts } from '@kbn/core-analytics-server';
import { type TelemetryChannel } from './types';
import type { ITelemetryReceiver } from './receiver';
@ -20,7 +21,8 @@ export interface IAsyncTelemetryEventsSender {
fallbackQueueConfig: QueueConfig,
telemetryReceiver: ITelemetryReceiver,
telemetrySetup?: TelemetryPluginSetup,
telemetryUsageCounter?: IUsageCounter
telemetryUsageCounter?: IUsageCounter,
analytics?: AnalyticsServiceSetup
) => void;
start: (telemetryStart?: TelemetryPluginStart) => void;
stop: () => Promise<void>;
@ -28,6 +30,7 @@ export interface IAsyncTelemetryEventsSender {
simulateSend: (channel: TelemetryChannel, events: unknown[]) => string[];
updateQueueConfig: (channel: TelemetryChannel, config: QueueConfig) => void;
updateDefaultQueueConfig: (config: QueueConfig) => void;
reportEBT: <T>(eventTypeOpts: EventTypeOpts<T>, eventData: T) => void;
}
/**

View file

@ -4,7 +4,14 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { chunked, chunkedBy } from './collections_helpers';
import { stagingIndices } from './__mocks__/staging_indices';
import {
type CommonPrefixesConfig,
chunked,
chunkedBy,
findCommonPrefixes,
chunkStringsByMaxLength,
} from './collections_helpers';
describe('telemetry.utils.chunked', () => {
it('should chunk simple case', async () => {
@ -79,3 +86,105 @@ describe('telemetry.utils.chunkedBy', () => {
expect(output).toEqual([['aaaa']]);
});
});
describe('telemetry.utils.findCommonPrefixes', () => {
it('should find common prefixes in simple case', async () => {
const indices = ['aaa', 'b', 'aa'];
const config: CommonPrefixesConfig = {
maxPrefixes: 10,
maxGroupSize: 10,
minPrefixSize: 1,
};
const output = findCommonPrefixes(indices, config);
expect(output).toHaveLength(2);
expect(output.find((v, _) => v.parts.length === 1 && v.parts[0] === 'a')?.indexCount).toEqual(
2
);
expect(output.find((v, _) => v.parts.length === 1 && v.parts[0] === 'b')?.indexCount).toEqual(
1
);
});
it('should find common prefixes with different minPrefixSize', async () => {
const indices = ['.ds-AA-0001', '.ds-AA-0002', '.ds-BB-0003'];
const config: CommonPrefixesConfig = {
maxPrefixes: 10,
maxGroupSize: 3,
minPrefixSize: 5,
};
const output = findCommonPrefixes(indices, config);
expect(output).toHaveLength(2);
expect(
output.find((v, _) => v.parts.length === 1 && v.parts[0] === '.ds-A')?.indexCount
).toEqual(2);
expect(
output.find((v, _) => v.parts.length === 1 && v.parts[0] === '.ds-B')?.indexCount
).toEqual(1);
});
it('should discard extra indices', async () => {
const indices = ['aaa', 'aaaaaa', 'aa'];
const config: CommonPrefixesConfig = {
maxPrefixes: 1,
maxGroupSize: 2,
minPrefixSize: 3,
};
const output = findCommonPrefixes(indices, config);
expect(output).toHaveLength(1);
expect(output.find((v, _) => v.parts.length === 1 && v.parts[0] === 'aaa')?.indexCount).toEqual(
2
);
});
it('should group many indices', async () => {
const indices = stagingIndices;
const config: CommonPrefixesConfig = {
maxPrefixes: 8,
maxGroupSize: 100,
minPrefixSize: 3,
};
const output = findCommonPrefixes(indices, config);
expect(output).toHaveLength(config.maxPrefixes);
expect(output.map((v, _) => v.indexCount).reduce((acc, i) => acc + i, 0)).toBe(indices.length);
});
});
describe('telemetry.utils.splitIndicesByNameLength', () => {
it('should chunk simple case', async () => {
const input = ['aa', 'b', 'ccc', 'ddd'];
const output = chunkStringsByMaxLength(input, 5);
expect(output).toEqual([['aa', 'b'], ['ccc'], ['ddd']]);
});
it('should chunk with remainder', async () => {
const input = ['aaa', 'b'];
const output = chunkStringsByMaxLength(input, 10);
expect(output).toEqual([['aaa', 'b']]);
});
it('should chunk with empty list', async () => {
const input: string[] = [];
const output = chunkStringsByMaxLength(input, 3);
expect(output).toEqual([]);
});
it('should chunk with single element smaller than max weight', async () => {
const input = ['aa'];
const output = chunkStringsByMaxLength(input, 3);
expect(output).toEqual([['aa']]);
});
it('should chunk with single element bigger than max weight', async () => {
const input = ['aaaa', 'bb'];
const output = chunkStringsByMaxLength(input, 4);
expect(output).toEqual([['aaaa'], ['bb']]);
});
});

View file

@ -63,3 +63,153 @@ class Chunked<T> {
return this.chunks.filter((chunk) => chunk.length > 0);
}
}
export interface CommonPrefixesConfig {
maxPrefixes: number;
maxGroupSize: number;
minPrefixSize: number;
}
interface TrieNode {
char: string;
prefix: string;
children: { [key: string]: TrieNode };
count: number;
isEnd: boolean;
id: number;
}
interface Group {
parts: string[];
indexCount: number;
}
function newTrieNode(char: string = '', prefix: string = '', id: number = 0): TrieNode {
return {
char,
children: {},
count: 0,
id,
isEnd: false,
prefix,
};
}
/**
* Finds and groups common prefixes from a list of strings.
*
* @param {string[]} indices - An array of strings from which common prefixes will be extracted.
* @param {CommonPrefixesConfig} config - A configuration object that defines the rules for grouping.
*
* The `config` object contains the following properties:
* - maxGroupSize {number}: The maximum number of indices allowed in a group.
* - maxPrefixes {number}: The maximum number of prefix groups to return.
* - minPrefixSize {number}: The minimum length of a prefix required to form a group. It avoid cases like returning
* a single character prefix, e.g., ['.ds-...1', '.ds-....2', ....] -> returns a single group '.'
*
* @returns {Group[]} - An array of groups where each group contains a list of prefix parts and the count of indices that share that prefix.
*
* Example usage:
*
* ```typescript
* const indices = ['apple', 'appetizer', 'application', 'banana', 'band', 'bandage'];
* const config = {
* maxGroupSize: 5,
* maxPrefixes: 3,
* minPrefixSize: 3
* };
*
* const result = findCommonPrefixes(indices, config);
* //result = [
* // { parts: [ 'ban' ], indexCount: 3 },
* // { parts: [ 'app' ], indexCount: 3 }
* //]
* ```
*/
export function findCommonPrefixes(indices: string[], config: CommonPrefixesConfig): Group[] {
const idCounter = function* (): Generator<number, number, number> {
let id = 0;
while (true) {
yield id++;
}
};
const idGen = idCounter();
const root = newTrieNode('', '', idGen.next().value);
for (const index of indices) {
let node = root;
node.count++;
for (const char of index) {
if (!node.children[char]) {
node.children[char] = newTrieNode(char, node.prefix + char, idGen.next().value);
}
node = node.children[char];
node.count++;
}
node.isEnd = true;
}
const nodes = [root];
const prefixes: Group[] = [];
while (nodes.length > 0) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const node = nodes.pop()!;
if (
(node.count <= config.maxGroupSize && node.prefix.length >= config.minPrefixSize) ||
(Object.keys(node.children).length === 0 && node.prefix.length >= config.minPrefixSize)
) {
const group: Group = {
parts: [node.prefix],
indexCount: node.count,
};
prefixes.push(group);
} else {
for (const child of Object.values(node.children)) {
nodes.push(child);
}
}
}
if (prefixes.length > config.maxPrefixes) {
prefixes.sort((a, b) => a.indexCount - b.indexCount);
while (prefixes.length > config.maxPrefixes) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const g1 = prefixes.shift()!;
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const g2 = prefixes.shift()!;
const mergedGroup: Group = {
parts: g1.parts.concat(g2.parts),
indexCount: g1.indexCount + g2.indexCount,
};
prefixes.push(mergedGroup);
prefixes.sort((a, b) => a.indexCount - b.indexCount);
}
}
return prefixes;
}
/**
* Splits an array of strings into chunks where the total length of strings in each chunk
* does not exceed the specified `maxLength`.
*
* @param strings - An array of strings to be chunked.
* @param maxLength - The maximum total length allowed for strings in each chunk. Defaults to 1024.
* @returns A two-dimensional array where each inner array is a chunk of strings.
*
* @example
* ```typescript
* const strings = ["hello", "world", "this", "is", "a", "test"];
* const chunks = chunkStringsByMaxLength(strings, 10);
* console.log(chunks);
* // Output: [["hello", "world"], ["this", "is"], ["a", "test"]]
* ```
*/
export function chunkStringsByMaxLength(strings: string[], maxLength: number = 3072): string[][] {
// plus 1 for the comma separator
return chunkedBy(strings, maxLength, (index) => index.length + 1);
}

View file

@ -6,7 +6,11 @@
*/
import os from 'os';
import type { PaginationConfiguration, TelemetrySenderChannelConfiguration } from './types';
import type {
IndicesMetadataConfiguration,
PaginationConfiguration,
TelemetrySenderChannelConfiguration,
} from './types';
class TelemetryConfigurationDTO {
private readonly DEFAULT_TELEMETRY_MAX_BUFFER_SIZE = 100;
@ -21,6 +25,15 @@ class TelemetryConfigurationDTO {
max_page_size_bytes: Math.min(os.totalmem() * 0.02, 80 * 1024 * 1024),
num_docs_to_sample: 10,
};
private readonly DEFAULT_INDICES_METADATA_CONFIG = {
indices_threshold: 10000,
datastreams_threshold: 1000,
max_prefixes: 10, // @deprecated
max_group_size: 100, // @deprecated
min_group_size: 5, // @deprecated
};
private _telemetry_max_buffer_size = this.DEFAULT_TELEMETRY_MAX_BUFFER_SIZE;
private _max_security_list_telemetry_batch = this.DEFAULT_MAX_SECURITY_LIST_TELEMETRY_BATCH;
private _max_endpoint_telemetry_batch = this.DEFAULT_MAX_ENDPOINT_TELEMETRY_BATCH;
@ -31,6 +44,8 @@ class TelemetryConfigurationDTO {
[key: string]: TelemetrySenderChannelConfiguration;
} = this.DEFAULT_SENDER_CHANNELS;
private _pagination_config: PaginationConfiguration = this.DEFAULT_PAGINATION_CONFIG;
private _indices_metadata_config: IndicesMetadataConfiguration =
this.DEFAULT_INDICES_METADATA_CONFIG;
public get telemetry_max_buffer_size(): number {
return this._telemetry_max_buffer_size;
@ -96,6 +111,14 @@ class TelemetryConfigurationDTO {
return this._pagination_config;
}
public set indices_metadata_config(paginationConfiguration: IndicesMetadataConfiguration) {
this._indices_metadata_config = paginationConfiguration;
}
public get indices_metadata_config(): IndicesMetadataConfiguration {
return this._indices_metadata_config;
}
public resetAllToDefault() {
this._telemetry_max_buffer_size = this.DEFAULT_TELEMETRY_MAX_BUFFER_SIZE;
this._max_security_list_telemetry_batch = this.DEFAULT_MAX_SECURITY_LIST_TELEMETRY_BATCH;
@ -104,6 +127,7 @@ class TelemetryConfigurationDTO {
this._max_detection_alerts_batch = this.DEFAULT_MAX_DETECTION_ALERTS_BATCH;
this._sender_channels = this.DEFAULT_SENDER_CHANNELS;
this._pagination_config = this.DEFAULT_PAGINATION_CONFIG;
this._indices_metadata_config = this.DEFAULT_INDICES_METADATA_CONFIG;
}
}

View file

@ -11,6 +11,7 @@ import type {
ResponseActionsApiCommandNames,
} from '../../../../common/endpoint/service/response_actions/constants';
import type { BulkUpsertAssetCriticalityRecordsResponse } from '../../../../common/api/entity_analytics';
import type { DataStreams, IlmPolicies, IlmsStats, IndicesStats } from '../indices.metadata.types';
export const RISK_SCORE_EXECUTION_SUCCESS_EVENT: EventTypeOpts<{
scoresWritten: number;
@ -271,6 +272,241 @@ export const ALERT_SUPPRESSION_EVENT: EventTypeOpts<{
},
};
export const TELEMETRY_DATA_STREAM_EVENT: EventTypeOpts<DataStreams> = {
eventType: 'telemetry_data_stream_event',
schema: {
items: {
type: 'array',
items: {
properties: {
datastream_name: {
type: 'keyword',
_meta: { description: 'Name of the data stream' },
},
indices: {
type: 'array',
items: {
properties: {
index_name: { type: 'date', _meta: { description: 'Index name' } },
ilm_policy: { type: 'date', _meta: { optional: true, description: 'ILM policy' } },
},
},
_meta: { optional: true, description: 'Indices associated with the data stream' },
},
},
},
_meta: { description: 'Datastreams' },
},
},
};
export const TELEMETRY_INDEX_STATS_EVENT: EventTypeOpts<IndicesStats> = {
eventType: 'telemetry_index_stats_event',
schema: {
items: {
type: 'array',
items: {
properties: {
index_name: {
type: 'keyword',
_meta: { description: 'The name of the index being monitored.' },
},
query_total: {
type: 'long',
_meta: {
optional: true,
description: 'The total number of search queries executed on the index.',
},
},
query_time_in_millis: {
type: 'long',
_meta: {
optional: true,
description:
'The total time spent on query execution across all search requests, measured in milliseconds.',
},
},
docs_count: {
type: 'long',
_meta: {
optional: true,
description: 'The total number of documents currently stored in the index.',
},
},
docs_deleted: {
type: 'long',
_meta: {
optional: true,
description:
'The total number of documents that have been marked as deleted in the index.',
},
},
docs_total_size_in_bytes: {
type: 'long',
_meta: {
optional: true,
description:
'The total size, in bytes, of all documents stored in the index, including storage overhead.',
},
},
},
},
_meta: { description: 'Datastreams' },
},
},
};
export const TELEMETRY_ILM_POLICY_EVENT: EventTypeOpts<IlmPolicies> = {
eventType: 'telemetry_ilm_policy_event',
schema: {
items: {
type: 'array',
items: {
properties: {
policy_name: {
type: 'keyword',
_meta: { description: 'The name of the ILM policy.' },
},
modified_date: {
type: 'date',
_meta: { description: 'The date when the ILM policy was last modified.' },
},
phases: {
properties: {
cold: {
properties: {
min_age: {
type: 'text',
_meta: {
description:
'The minimum age before the index transitions to the "cold" phase.',
},
},
},
_meta: {
optional: true,
description:
'Configuration settings for the "cold" phase of the ILM policy, applied when data is infrequently accessed.',
},
},
delete: {
properties: {
min_age: {
type: 'text',
_meta: {
description:
'The minimum age before the index transitions to the "delete" phase.',
},
},
},
_meta: {
optional: true,
description:
'Configuration settings for the "delete" phase of the ILM policy, specifying when the index should be removed.',
},
},
frozen: {
properties: {
min_age: {
type: 'text',
_meta: {
description:
'The minimum age before the index transitions to the "frozen" phase.',
},
},
},
_meta: {
optional: true,
description:
'Configuration settings for the "frozen" phase of the ILM policy, where data is fully searchable but stored with a reduced resource footprint.',
},
},
hot: {
properties: {
min_age: {
type: 'text',
_meta: {
description:
'The minimum age before the index transitions to the "hot" phase.',
},
},
},
_meta: {
optional: true,
description:
'Configuration settings for the "hot" phase of the ILM policy, applied to actively written and queried data.',
},
},
warm: {
properties: {
min_age: {
type: 'text',
_meta: {
description:
'The minimum age before the index transitions to the "warm" phase.',
},
},
},
_meta: {
optional: true,
description:
'Configuration settings for the "warm" phase of the ILM policy, used for read-only data that is less frequently accessed.',
},
},
},
_meta: {
description:
'The different phases of the ILM policy that define how the index is managed over time.',
},
},
},
},
_meta: { description: 'Datastreams' },
},
},
};
export const TELEMETRY_ILM_STATS_EVENT: EventTypeOpts<IlmsStats> = {
eventType: 'telemetry_ilm_stats_event',
schema: {
items: {
type: 'array',
items: {
properties: {
index_name: {
type: 'keyword',
_meta: { description: 'The name of the index currently managed by the ILM policy.' },
},
phase: {
type: 'keyword',
_meta: {
optional: true,
description:
'The current phase of the ILM policy that the index is in (e.g., hot, warm, cold, frozen, or delete).',
},
},
age: {
type: 'text',
_meta: {
optional: true,
description:
'The age of the index since its creation, indicating how long it has existed.',
},
},
policy_name: {
type: 'keyword',
_meta: {
optional: true,
description: 'The name of the ILM policy applied to this index.',
},
},
},
},
_meta: { description: 'Datastreams' },
},
},
};
interface CreateAssetCriticalityProcessedFileEvent {
result?: BulkUpsertAssetCriticalityRecordsResponse['stats'];
startTime: Date;
@ -457,4 +693,8 @@ export const events = [
ENTITY_ENGINE_RESOURCE_INIT_FAILURE_EVENT,
ENTITY_ENGINE_INITIALIZATION_EVENT,
ENTITY_STORE_USAGE_EVENT,
TELEMETRY_DATA_STREAM_EVENT,
TELEMETRY_ILM_POLICY_EVENT,
TELEMETRY_ILM_STATS_EVENT,
TELEMETRY_INDEX_STATS_EVENT,
];

View file

@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { DateTime } from '@elastic/elasticsearch/lib/api/types';
import type { Nullable } from './types';
export interface IlmPolicies {
items: IlmPolicy[];
}
export interface IlmPolicy {
policy_name: string;
modified_date: DateTime;
phases: IlmPhases;
}
export interface IlmPhases {
cold: Nullable<IlmPhase>;
delete: Nullable<IlmPhase>;
frozen: Nullable<IlmPhase>;
hot: Nullable<IlmPhase>;
warm: Nullable<IlmPhase>;
}
export interface IlmPhase {
min_age: string;
}
export interface IlmsStats {
items: IlmStats[];
}
export interface IlmStats {
index_name: string;
phase?: string;
age?: string;
policy_name?: string;
}
export interface IndicesStats {
items: IndexStats[];
}
export interface IndexStats {
index_name: string;
query_total?: number;
query_time_in_millis?: number;
docs_count?: number;
docs_deleted?: number;
docs_total_size_in_bytes?: number;
}
export interface Index {
index_name: string;
ilm_policy?: string;
}
export interface DataStreams {
items: DataStream[];
}
export interface DataStream {
datastream_name: string;
indices?: Index[];
}

View file

@ -7,7 +7,7 @@
import type { AxiosInstance, AxiosResponse } from 'axios';
import axios, { AxiosHeaders } from 'axios';
import type { Logger } from '@kbn/core/server';
import type { EventTypeOpts, Logger } from '@kbn/core/server';
import type { TelemetryPluginStart, TelemetryPluginSetup } from '@kbn/telemetry-plugin/server';
import type { UsageCounter } from '@kbn/usage-collection-plugin/server';
@ -37,6 +37,9 @@ export class PreviewTelemetryEventsSender implements ITelemetryEventsSender {
/** Last sent message */
private sentMessages: string[] = [];
/** Last sent EBT events */
private ebtEventsSent: Array<{ eventType: string; eventData: object }> = [];
/** Logger for this class */
private logger: Logger;
@ -87,6 +90,10 @@ export class PreviewTelemetryEventsSender implements ITelemetryEventsSender {
return this.sentMessages;
}
public getEbtEventsSent(): Array<{ eventType: string; eventData: object }> {
return this.ebtEventsSent;
}
public setup(
telemetryReceiver: ITelemetryReceiver,
telemetrySetup?: TelemetryPluginSetup,
@ -174,4 +181,12 @@ export class PreviewTelemetryEventsSender implements ITelemetryEventsSender {
public updateDefaultQueueConfig(config: QueueConfig): void {
this.composite.updateDefaultQueueConfig(config);
}
public reportEBT<T>(eventTypeOpts: EventTypeOpts<T>, eventData: T): void {
this.ebtEventsSent.push({
eventType: eventTypeOpts.eventType,
eventData: eventData as object,
});
this.composite.reportEBT(eventTypeOpts, eventData);
}
}

View file

@ -18,6 +18,7 @@ import type {
} from '@kbn/core/server';
import type {
AggregationsAggregate,
IlmExplainLifecycleRequest,
OpenPointInTimeResponse,
SearchRequest,
SearchResponse,
@ -38,6 +39,10 @@ import type {
SearchHit,
SearchRequest as ESSearchRequest,
SortResults,
IndicesGetDataStreamRequest,
IndicesStatsRequest,
IlmGetLifecycleRequest,
IndicesGetRequest,
} from '@elastic/elasticsearch/lib/api/types';
import type { TransportResult } from '@elastic/elasticsearch';
import type { AgentPolicy, Installation } from '@kbn/fleet-plugin/common';
@ -87,6 +92,16 @@ import { ENDPOINT_METRICS_INDEX } from '../../../common/constants';
import { PREBUILT_RULES_PACKAGE_NAME } from '../../../common/detection_engine/constants';
import { DEFAULT_DIAGNOSTIC_INDEX } from './constants';
import type { TelemetryLogger } from './telemetry_logger';
import type {
DataStream,
IlmPhase,
IlmPhases,
IlmPolicy,
IlmStats,
Index,
IndexStats,
} from './indices.metadata.types';
import { chunkStringsByMaxLength } from './collections_helpers';
export interface ITelemetryReceiver {
start(
@ -238,6 +253,12 @@ export interface ITelemetryReceiver {
setMaxPageSizeBytes(bytes: number): void;
setNumDocsToSample(n: number): void;
getIndices(): Promise<string[]>;
getDataStreams(): Promise<DataStream[]>;
getIndicesStats(indices: string[]): AsyncGenerator<IndexStats, void, unknown>;
getIlmsStats(indices: string[]): AsyncGenerator<IlmStats, void, unknown>;
getIlmsPolicies(ilms: string[]): AsyncGenerator<IlmPolicy, void, unknown>;
}
export class TelemetryReceiver implements ITelemetryReceiver {
@ -532,7 +553,6 @@ export class TelemetryReceiver implements ITelemetryReceiver {
const buckets = endpointMetadataResponse?.aggregations?.endpoint_metadata?.buckets ?? [];
return buckets.reduce((cache, endpointAgentId) => {
// const id = endpointAgentId.latest_metadata.hits.hits[0]._id;
const doc = endpointAgentId.latest_metadata.hits.hits[0]._source;
cache.set(endpointAgentId.key, doc);
return cache;
@ -541,7 +561,7 @@ export class TelemetryReceiver implements ITelemetryReceiver {
}
public async *fetchDiagnosticAlertsBatch(executeFrom: string, executeTo: string) {
this.logger.debug('Searching diagnostic alerts', {
this.logger.l('Searching diagnostic alerts', {
from: executeFrom,
to: executeTo,
} as LogMeta);
@ -585,10 +605,10 @@ export class TelemetryReceiver implements ITelemetryReceiver {
fetchMore = false;
}
this.logger.debug('Diagnostic alerts to return', { numOfHits } as LogMeta);
this.logger.l('Diagnostic alerts to return', { numOfHits } as LogMeta);
fetchMore = numOfHits > 0 && numOfHits < telemetryConfiguration.telemetry_max_buffer_size;
} catch (e) {
this.logger.l('Error fetching alerts', { error: JSON.stringify(e) });
this.logger.warn('Error fetching alerts', { error_message: e.message } as LogMeta);
fetchMore = false;
}
@ -761,7 +781,7 @@ export class TelemetryReceiver implements ITelemetryReceiver {
executeFrom: string,
executeTo: string
) {
this.logger.debug('Searching prebuilt rule alerts from', {
this.logger.l('Searching prebuilt rule alerts from', {
executeFrom,
executeTo,
} as LogMeta);
@ -899,14 +919,14 @@ export class TelemetryReceiver implements ITelemetryReceiver {
pitId = response?.pit_id;
}
this.logger.debug('Prebuilt rule alerts to return', { alerts: alerts.length } as LogMeta);
this.logger.l('Prebuilt rule alerts to return', { alerts: alerts.length } as LogMeta);
yield alerts;
}
} catch (e) {
// to keep backward compatibility with the previous implementation, silent return
// once we start using `paginate` this error should be managed downstream
this.logger.l('Error fetching alerts', { error: JSON.stringify(e) });
this.logger.warn('Error fetching alerts', { error_message: e.message } as LogMeta);
return;
} finally {
await this.closePointInTime(pitId);
@ -930,10 +950,10 @@ export class TelemetryReceiver implements ITelemetryReceiver {
try {
await this.esClient().closePointInTime({ id: pitId });
} catch (error) {
this.logger.l('Error trying to close point in time', {
this.logger.warn('Error trying to close point in time', {
pit: pitId,
error: JSON.stringify(error),
});
error_message: error.message,
} as LogMeta);
}
}
@ -1019,7 +1039,7 @@ export class TelemetryReceiver implements ITelemetryReceiver {
fetchMore = numOfHits > 0;
} catch (e) {
this.logger.l('Error fetching alerts', { error: JSON.stringify(e) });
this.logger.warn('Error fetching alerts', { error_message: e.message } as LogMeta);
fetchMore = false;
}
@ -1034,11 +1054,11 @@ export class TelemetryReceiver implements ITelemetryReceiver {
try {
await this.esClient().closePointInTime({ id: pitId });
} catch (error) {
this.logger.l('Error trying to close point in time', {
this.logger.warn('Error trying to close point in time', {
pit: pitId,
error: JSON.stringify(error),
error_message: error.message,
keepAlive,
});
} as LogMeta);
}
this.logger.l('Timeline alerts to return', { alerts: alertsToReturn.length });
@ -1232,7 +1252,7 @@ export class TelemetryReceiver implements ITelemetryReceiver {
return ret.license;
} catch (err) {
this.logger.l('failed retrieving license', { error: JSON.stringify(err) });
this.logger.warn('failed retrieving license', { error_message: err.message } as LogMeta);
return undefined;
}
}
@ -1293,7 +1313,7 @@ export class TelemetryReceiver implements ITelemetryReceiver {
yield data;
} while (esQuery.search_after !== undefined);
} catch (e) {
this.logger.l('Error running paginated query', { error: JSON.stringify(e) });
this.logger.warn('Error running paginated query', { error_message: e.message } as LogMeta);
throw e;
} finally {
await this.closePointInTime(pit.id);
@ -1320,4 +1340,198 @@ export class TelemetryReceiver implements ITelemetryReceiver {
}
return this._esClient;
}
public async getIndices(): Promise<string[]> {
const es = this.esClient();
this.logger.l('Fetching indices');
const request: IndicesGetRequest = {
index: '*',
expand_wildcards: ['open', 'hidden'],
filter_path: ['*.settings.index.provided_name'],
};
return es.indices
.get(request)
.then((indices) => Array.from(Object.keys(indices)))
.catch((error) => {
this.logger.warn('Error fetching indices', { error_message: error } as LogMeta);
throw error;
});
}
public async getDataStreams(): Promise<DataStream[]> {
const es = this.esClient();
this.logger.l('Fetching datstreams');
const request: IndicesGetDataStreamRequest = {
name: '*',
expand_wildcards: ['open', 'hidden'],
filter_path: ['data_streams.name', 'data_streams.indices'],
};
return es.indices
.getDataStream(request)
.then((response) =>
response.data_streams.map((ds) => {
return {
datastream_name: ds.name,
indices:
ds.indices?.map((index) => {
return {
index_name: index.index_name,
ilm_policy: index.ilm_policy,
} as Index;
}) ?? [],
} as DataStream;
})
)
.catch((error) => {
this.logger.warn('Error fetching datastreams', { error_message: error } as LogMeta);
throw error;
});
}
public async *getIndicesStats(indices: string[]) {
const es = this.esClient();
this.logger.l('Fetching indices stats');
const groupedIndices = chunkStringsByMaxLength(indices);
this.logger.l('Splitted indices into groups', {
groups: groupedIndices.length,
indices: indices.length,
} as LogMeta);
for (const group of groupedIndices) {
const request: IndicesStatsRequest = {
index: group,
level: 'indices',
metric: ['docs', 'search', 'store'],
expand_wildcards: ['open', 'hidden'],
filter_path: [
'indices.*.total.search.query_total',
'indices.*.total.search.query_time_in_millis',
'indices.*.total.docs.count',
'indices.*.total.docs.deleted',
'indices.*.total.store.size_in_bytes',
],
};
try {
const response = await es.indices.stats(request);
for (const [indexName, stats] of Object.entries(response.indices ?? {})) {
yield {
index_name: indexName,
query_total: stats.total?.search?.query_total,
query_time_in_millis: stats.total?.search?.query_time_in_millis,
docs_count: stats.total?.docs?.count,
docs_deleted: stats.total?.docs?.deleted,
docs_total_size_in_bytes: stats.total?.store?.size_in_bytes,
} as IndexStats;
}
} catch (error) {
this.logger.warn('Error fetching indices stats', { error_message: error } as LogMeta);
throw error;
}
}
}
public async *getIlmsStats(indices: string[]) {
const es = this.esClient();
const groupedIndices = chunkStringsByMaxLength(indices);
this.logger.l('Splitted ilms into groups', {
groups: groupedIndices.length,
indices: indices.length,
} as LogMeta);
for (const group of groupedIndices) {
const request: IlmExplainLifecycleRequest = {
index: group.join(','),
only_managed: false,
filter_path: ['indices.*.phase', 'indices.*.age', 'indices.*.policy'],
};
const data = await es.ilm.explainLifecycle(request);
try {
for (const [indexName, stats] of Object.entries(data.indices ?? {})) {
const entry = {
index_name: indexName,
phase: ('phase' in stats && stats.phase) || undefined,
age: ('age' in stats && stats.age) || undefined,
policy_name: ('policy' in stats && stats.policy) || undefined,
} as IlmStats;
yield entry;
}
} catch (error) {
this.logger.warn('Error fetching ilm stats', { error_message: error } as LogMeta);
throw error;
}
}
}
public async *getIlmsPolicies(ilms: string[]) {
const es = this.esClient();
const phase = (obj: unknown): Nullable<IlmPhase> => {
let value: Nullable<IlmPhase>;
if (obj !== null && obj !== undefined && typeof obj === 'object' && 'min_age' in obj) {
value = {
min_age: obj.min_age,
} as IlmPhase;
}
return value;
};
const groupedIlms = chunkStringsByMaxLength(ilms);
this.logger.l('Splitted ilms into groups', {
groups: groupedIlms.length,
ilms: ilms.length,
} as LogMeta);
for (const group of groupedIlms) {
this.logger.l('Fetching ilm policies');
const request: IlmGetLifecycleRequest = {
name: group.join(','),
filter_path: [
'*.policy.phases.cold.min_age',
'*.policy.phases.delete.min_age',
'*.policy.phases.frozen.min_age',
'*.policy.phases.hot.min_age',
'*.policy.phases.warm.min_age',
'*.modified_date',
],
};
const response = await es.ilm.getLifecycle(request);
try {
for (const [policyName, stats] of Object.entries(response ?? {})) {
yield {
policy_name: policyName,
modified_date: stats.modified_date,
phases: {
cold: phase(stats.policy.phases.cold),
delete: phase(stats.policy.phases.delete),
frozen: phase(stats.policy.phases.frozen),
hot: phase(stats.policy.phases.hot),
warm: phase(stats.policy.phases.warm),
} as IlmPhases,
} as IlmPolicy;
}
} catch (error) {
this.logger.warn('Error fetching ilm policies', {
error_message: error.message,
} as LogMeta);
throw error;
}
}
}
}

View file

@ -9,7 +9,7 @@ import { cloneDeep } from 'lodash';
import { URL } from 'url';
import { transformDataToNdjson } from '@kbn/securitysolution-utils';
import type { Logger, LogMeta } from '@kbn/core/server';
import type { EventTypeOpts, Logger, LogMeta } from '@kbn/core/server';
import type { TelemetryPluginStart, TelemetryPluginSetup } from '@kbn/telemetry-plugin/server';
import type { UsageCounter } from '@kbn/usage-collection-plugin/server';
import type { AxiosInstance } from 'axios';
@ -88,6 +88,11 @@ export interface ITelemetryEventsSender {
* Updates the default queue configuration.
*/
updateDefaultQueueConfig: (config: QueueConfig) => void;
/**
* Reports EBT events
*/
reportEBT: <T>(eventTypeOpts: EventTypeOpts<T>, eventData: T) => void;
}
export class TelemetryEventsSender implements ITelemetryEventsSender {
@ -270,12 +275,16 @@ export class TelemetryEventsSender implements ITelemetryEventsSender {
const telemetryUrl = await this.fetchTelemetryPingUrl();
const resp = await axios.get(telemetryUrl, { timeout: 3000 });
if (resp.status === 200) {
this.logger.l('[Security Telemetry] elastic telemetry services are reachable');
this.logger.debug('Elastic telemetry services are reachable');
return true;
}
return false;
} catch (_err) {
} catch (e) {
this.logger.warn('Error pinging telemetry services', {
error: e.message,
} as LogMeta);
return false;
}
}
@ -426,6 +435,10 @@ export class TelemetryEventsSender implements ITelemetryEventsSender {
this.getAsyncTelemetrySender().updateDefaultQueueConfig(config);
}
public reportEBT<T>(eventTypeOpts: EventTypeOpts<T>, eventData: T): void {
this.getAsyncTelemetrySender().reportEBT(eventTypeOpts, eventData);
}
private getAsyncTelemetrySender(): IAsyncTelemetryEventsSender {
if (!this.asyncTelemetrySender) {
throw new Error('Telemetry Sender V2 not initialized');

View file

@ -16,6 +16,7 @@ import {
createMockTaskMetrics,
createMockSecurityTelemetryTask,
} from './__mocks__';
import { newTelemetryLogger } from './helpers';
describe('test security telemetry task', () => {
let logger: ReturnType<typeof loggingSystemMock.createLogger>;
@ -66,7 +67,7 @@ describe('test security telemetry task', () => {
expect(mockTelemetryTaskConfig.runTask).toHaveBeenCalledWith(
telemetryTask.getTaskId(),
logger,
newTelemetryLogger(logger.get('task')),
mockTelemetryReceiver,
mockTelemetryEventsSender,
mockTaskMetrics,

View file

@ -6,7 +6,7 @@
*/
import moment from 'moment';
import type { Logger } from '@kbn/core/server';
import type { Logger, LogMeta } from '@kbn/core/server';
import type {
ConcreteTaskInstance,
TaskManagerSetupContract,
@ -15,8 +15,9 @@ import type {
import type { ITelemetryReceiver } from './receiver';
import type { ITelemetryEventsSender } from './sender';
import type { ITaskMetricsService } from './task_metrics.types';
import { tlog } from './helpers';
import { stateSchemaByVersion, emptyState, type LatestTaskStateSchema } from './task_state';
import { newTelemetryLogger } from './helpers';
import { type TelemetryLogger } from './telemetry_logger';
export interface SecurityTelemetryTaskConfig {
type: string;
@ -49,7 +50,7 @@ export type LastExecutionTimestampCalculator = (
export class SecurityTelemetryTask {
private readonly config: SecurityTelemetryTaskConfig;
private readonly logger: Logger;
private readonly logger: TelemetryLogger;
private readonly sender: ITelemetryEventsSender;
private readonly receiver: ITelemetryReceiver;
private readonly taskMetricsService: ITaskMetricsService;
@ -62,7 +63,7 @@ export class SecurityTelemetryTask {
taskMetricsService: ITaskMetricsService
) {
this.config = config;
this.logger = logger;
this.logger = newTelemetryLogger(logger.get('task'));
this.sender = sender;
this.receiver = receiver;
this.taskMetricsService = taskMetricsService;
@ -122,7 +123,7 @@ export class SecurityTelemetryTask {
public start = async (taskManager: TaskManagerStartContract) => {
const taskId = this.getTaskId();
tlog(this.logger, `[task ${taskId}]: attempting to schedule`);
this.logger.debug('Attempting to schedule task', { taskId } as LogMeta);
try {
await taskManager.ensureScheduled({
id: taskId,
@ -135,30 +136,32 @@ export class SecurityTelemetryTask {
params: { version: this.config.version },
});
} catch (e) {
this.logger.error(`[task ${taskId}]: error scheduling task, received ${e.message}`);
this.logger.error('Error scheduling task', {
error: e.message,
} as LogMeta);
}
};
public runTask = async (taskId: string, executionPeriod: TaskExecutionPeriod) => {
tlog(this.logger, `[task ${taskId}]: attempting to run`);
this.logger.debug('Attempting to run', { taskId } as LogMeta);
if (taskId !== this.getTaskId()) {
tlog(this.logger, `[task ${taskId}]: outdated task`);
this.logger.info('outdated task', { taskId } as LogMeta);
return 0;
}
const isOptedIn = await this.sender.isTelemetryOptedIn();
if (!isOptedIn) {
tlog(this.logger, `[task ${taskId}]: telemetry is not opted-in`);
this.logger.info('Telemetry is not opted-in', { taskId } as LogMeta);
return 0;
}
const isTelemetryServicesReachable = await this.sender.isTelemetryServicesReachable();
if (!isTelemetryServicesReachable) {
tlog(this.logger, `[task ${taskId}]: cannot reach telemetry services`);
this.logger.info('Cannot reach telemetry services', { taskId } as LogMeta);
return 0;
}
tlog(this.logger, `[task ${taskId}]: running task`);
this.logger.debug('Running task', { taskId } as LogMeta);
return this.config.runTask(
taskId,
this.logger,

View file

@ -29,7 +29,11 @@ export class TaskMetricsService implements ITaskMetricsService {
public async end(trace: Trace, error?: Error): Promise<void> {
const event = this.createTaskMetric(trace, error);
this.logger.l(`Task ${event.name} complete. Task run took ${event.time_executed_in_ms}ms`);
this.logger.l('Task completed', {
task_name: event.name,
time_executed_in_ms: event.time_executed_in_ms,
error_message: event.error_message,
});
if (telemetryConfiguration.use_async_sender) {
this.sender.sendAsync(TelemetryChannel.TASK_METRICS, [event]);

View file

@ -51,7 +51,7 @@ export function createTelemetryConfigurationTaskConfig() {
const configArtifact = manifest.data as unknown as TelemetryConfiguration;
log.l('Got telemetry configuration artifact', {
artifact: configArtifact,
artifact: configArtifact ?? '<null>',
});
telemetryConfiguration.max_detection_alerts_batch =
@ -107,6 +107,11 @@ export function createTelemetryConfigurationTaskConfig() {
_receiver.setNumDocsToSample(configArtifact.pagination_config.num_docs_to_sample);
}
if (configArtifact.indices_metadata_config) {
log.l('Updating indices metadata configuration');
telemetryConfiguration.indices_metadata_config = configArtifact.indices_metadata_config;
}
await taskMetricsService.end(trace);
log.l('Updated TelemetryConfiguration', { configuration: telemetryConfiguration });

View file

@ -16,6 +16,7 @@ import { createTelemetryDiagnosticTimelineTaskConfig } from './timelines_diagnos
import { createTelemetryConfigurationTaskConfig } from './configuration';
import { telemetryConfiguration } from '../configuration';
import { createTelemetryFilterListArtifactTaskConfig } from './filterlists';
import { createTelemetryIndicesMetadataTaskConfig } from './indices.metadata';
export function createTelemetryTaskConfigs(): SecurityTelemetryTaskConfig[] {
return [
@ -30,5 +31,6 @@ export function createTelemetryTaskConfigs(): SecurityTelemetryTaskConfig[] {
createTelemetryDiagnosticTimelineTaskConfig(),
createTelemetryConfigurationTaskConfig(),
createTelemetryFilterListArtifactTaskConfig(),
createTelemetryIndicesMetadataTaskConfig(),
];
}

View file

@ -0,0 +1,192 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { LogMeta, Logger } from '@kbn/core/server';
import type { ITelemetryEventsSender } from '../sender';
import type { ITelemetryReceiver } from '../receiver';
import type { TaskExecutionPeriod } from '../task';
import type { ITaskMetricsService } from '../task_metrics.types';
import {
createUsageCounterLabel,
getPreviousDailyTaskTimestamp,
newTelemetryLogger,
} from '../helpers';
import {
TELEMETRY_DATA_STREAM_EVENT,
TELEMETRY_ILM_POLICY_EVENT,
TELEMETRY_ILM_STATS_EVENT,
TELEMETRY_INDEX_STATS_EVENT,
} from '../event_based/events';
import { telemetryConfiguration } from '../configuration';
import type {
DataStream,
DataStreams,
IlmPolicies,
IlmsStats,
IndicesStats,
} from '../indices.metadata.types';
import { TelemetryCounter } from '../types';
const COUNTER_LABELS = ['security_solution', 'indices-metadata'];
export function createTelemetryIndicesMetadataTaskConfig() {
const taskType = 'security:indices-metadata-telemetry';
return {
type: taskType,
title: 'Security Solution Telemetry Indices Metadata task',
interval: '24h',
timeout: '1m',
version: '1.0.0',
getLastExecutionTime: getPreviousDailyTaskTimestamp,
runTask: async (
taskId: string,
logger: Logger,
receiver: ITelemetryReceiver,
sender: ITelemetryEventsSender,
taskMetricsService: ITaskMetricsService,
taskExecutionPeriod: TaskExecutionPeriod
) => {
const mdc = { task_id: taskId, task_execution_period: taskExecutionPeriod };
const log = newTelemetryLogger(logger.get('indices-metadata'), mdc);
const trace = taskMetricsService.start(taskType);
const taskConfig = telemetryConfiguration.indices_metadata_config;
const publishDatastreamsStats = (stats: DataStream[]): number => {
const events: DataStreams = {
items: stats,
};
sender.reportEBT(TELEMETRY_DATA_STREAM_EVENT, events);
log.info(`Sent data streams`, { count: events.items.length } as LogMeta);
return events.items.length;
};
const publishIndicesStats = async (indices: string[]): Promise<number> => {
const indicesStats: IndicesStats = {
items: [],
};
for await (const stat of receiver.getIndicesStats(indices)) {
indicesStats.items.push(stat);
}
sender.reportEBT(TELEMETRY_INDEX_STATS_EVENT, indicesStats);
log.info(`Sent indices stats`, { count: indicesStats.items.length } as LogMeta);
return indicesStats.items.length;
};
const publishIlmStats = async (indices: string[]): Promise<Set<string>> => {
const ilmNames = new Set<string>();
const ilmsStats: IlmsStats = {
items: [],
};
for await (const stat of receiver.getIlmsStats(indices)) {
if (stat.policy_name !== undefined) {
ilmNames.add(stat.policy_name);
ilmsStats.items.push(stat);
}
}
sender.reportEBT(TELEMETRY_ILM_STATS_EVENT, ilmsStats);
log.info(`Sent ILM stats`, { count: ilmNames.size } as LogMeta);
return ilmNames;
};
const publishIlmPolicies = async (ilmNames: Set<string>): Promise<number> => {
const ilmPolicies: IlmPolicies = {
items: [],
};
for await (const policy of receiver.getIlmsPolicies(Array.from(ilmNames.values()))) {
ilmPolicies.items.push(policy);
}
sender.reportEBT(TELEMETRY_ILM_POLICY_EVENT, ilmPolicies);
log.info('Sent ILM policies', { count: ilmPolicies.items.length } as LogMeta);
return ilmPolicies.items.length;
};
const incrementCounter = (type: TelemetryCounter, name: string, value: number) => {
const telemetryUsageCounter = sender.getTelemetryUsageCluster();
telemetryUsageCounter?.incrementCounter({
counterName: createUsageCounterLabel(COUNTER_LABELS.concat(name)),
counterType: type,
incrementBy: value,
});
};
try {
// 1. Get cluster stats and list of indices and datastreams
const [indices, dataStreams] = await Promise.all([
receiver.getIndices(),
receiver.getDataStreams(),
]);
// 2. Publish datastreams stats
const dsCount = publishDatastreamsStats(
dataStreams.slice(0, taskConfig.datastreams_threshold)
);
incrementCounter(TelemetryCounter.DOCS_SENT, 'datastreams-stats', dsCount);
// 3. Get and publish indices stats
const indicesCount: number = await publishIndicesStats(
indices.slice(0, taskConfig.indices_threshold)
)
.then((count) => {
incrementCounter(TelemetryCounter.DOCS_SENT, 'indices-stats', count);
return count;
})
.catch((err) => {
log.warn(`Error getting indices stats`, { error: err.message } as LogMeta);
incrementCounter(TelemetryCounter.RUNTIME_ERROR, 'indices-stats', 1);
return 0;
});
// 4. Get ILM stats and publish them
const ilmNames = await publishIlmStats(indices.slice(0, taskConfig.indices_threshold))
.then((names) => {
incrementCounter(TelemetryCounter.DOCS_SENT, 'ilm-stats', names.size);
return names;
})
.catch((err) => {
log.warn(`Error getting ILM stats`, { error: err.message } as LogMeta);
incrementCounter(TelemetryCounter.RUNTIME_ERROR, 'ilm-stats', 1);
return new Set<string>();
});
// 5. Publish ILM policies
const policyCount = await publishIlmPolicies(ilmNames)
.then((count) => {
incrementCounter(TelemetryCounter.DOCS_SENT, 'ilm-policies', count);
return count;
})
.catch((err) => {
log.warn(`Error getting ILM policies`, { error: err.message } as LogMeta);
incrementCounter(TelemetryCounter.RUNTIME_ERROR, 'ilm-policies', 1);
return 0;
});
log.info(`Sent EBT events`, {
datastreams: dsCount,
ilms: ilmNames.size,
indices: indicesCount,
policies: policyCount,
} as LogMeta);
await taskMetricsService.end(trace);
return indicesCount;
} catch (err) {
log.warn(`Error running indices metadata task`, {
error: err.message,
} as LogMeta);
await taskMetricsService.end(trace, err);
return 0;
}
},
};
}

View file

@ -465,6 +465,15 @@ export interface TelemetryConfiguration {
[key: string]: TelemetrySenderChannelConfiguration;
};
pagination_config?: PaginationConfiguration;
indices_metadata_config?: IndicesMetadataConfiguration;
}
export interface IndicesMetadataConfiguration {
indices_threshold: number;
datastreams_threshold: number;
max_prefixes: number;
max_group_size: number;
min_group_size: number;
}
export interface PaginationConfiguration {

View file

@ -499,7 +499,8 @@ export class Plugin implements ISecuritySolutionPlugin {
DEFAULT_QUEUE_CONFIG,
this.telemetryReceiver,
plugins.telemetry,
this.telemetryUsageCounter
this.telemetryUsageCounter,
core.analytics
);
this.telemetryEventsSender.setup(

View file

@ -5,9 +5,10 @@
* 2.0.
*/
export * from './rules';
export * from './alerts';
export * from './delete_all_anomalies';
export * from './count_down_test';
export * from './delete_all_anomalies';
export * from './route_with_namespace';
export * from './rules';
export * from './tasks';
export * from './wait_for';

View file

@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export * from './indices_metadata';
export * from './task_manager';

View file

@ -0,0 +1,108 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Client } from '@elastic/elasticsearch';
const DS_PREFIX = 'testing-datastream';
const ILM_PREFIX = 'testing-ilm';
export const randomDatastream = async (es: Client, policyName?: string): Promise<string> => {
const name = `${DS_PREFIX}-${Date.now()}`;
let settings = {};
if (policyName) {
settings = {
...settings,
'index.lifecycle.name': policyName,
};
}
const indexTemplateBody = {
index_patterns: [`${DS_PREFIX}-*`],
data_stream: {},
template: {
settings,
},
};
await es.indices.putIndexTemplate({
name: DS_PREFIX,
body: indexTemplateBody,
});
await es.indices.createDataStream({ name });
return name;
};
export const randomIlmPolicy = async (es: Client): Promise<string> => {
const name = `${ILM_PREFIX}-${Date.now()}`;
const policy = {
phases: {
hot: {
actions: {
rollover: {
max_size: '50gb',
max_age: '30d',
},
},
},
warm: {
min_age: '30d',
actions: {
forcemerge: {
max_num_segments: 1,
},
shrink: {
number_of_shards: 1,
},
allocate: {
number_of_replicas: 1,
},
},
},
delete: {
min_age: '90d',
actions: {
delete: {},
},
},
},
};
await es.ilm.putLifecycle({ name, policy });
return name;
};
export const ensureBackingIndices = async (dsName: string, count: number, es: Client) => {
const stats = await es.indices.dataStreamsStats({ name: dsName });
if (stats.data_streams.length !== 1) {
throw new Error('Data stream not found');
}
const current = stats.data_streams[0].backing_indices;
if (current < count) {
for (let i = current; i < count; i++) {
await es.indices.rollover({ alias: dsName });
}
} else if (current > count) {
throw new Error('Cannot reduce the number of backing indices');
}
};
export const cleanupDatastreams = async (es: Client) => {
await es.indices.deleteDataStream({ name: `${DS_PREFIX}*` });
};
export const cleanupPolicies = async (es: Client) => {
const policies = await es.ilm.getLifecycle({ name: `${ILM_PREFIX}*` });
await Promise.all(Object.entries(policies).map(([name, _]) => es.ilm.deleteLifecycle({ name })));
};

View file

@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { TaskStatus } from '@kbn/task-manager-plugin/server';
import { KbnClient } from '@kbn/test';
import { ToolingLog } from '@kbn/tooling-log';
export const taskHasRun = async (taskId: string, kbn: KbnClient, after: Date): Promise<boolean> => {
const task = await kbn.savedObjects.get({
type: 'task',
id: taskId,
});
const runAt = new Date(task.attributes.runAt);
const status = task.attributes.status;
return runAt > after && status === TaskStatus.Idle;
};
export const launchTask = async (
taskId: string,
kbn: KbnClient,
logger: ToolingLog,
delayMillis: number = 1_000
): Promise<Date> => {
logger.info(`Launching task ${taskId}`);
const task = await kbn.savedObjects.get({
type: 'task',
id: taskId,
});
const runAt = new Date(Date.now() + delayMillis).toISOString();
await kbn.savedObjects.update({
type: 'task',
id: taskId,
attributes: {
...task.attributes,
runAt,
scheduledAt: runAt,
status: TaskStatus.Idle,
},
});
logger.info(`Task ${taskId} launched`);
return new Date(runAt);
};

View file

@ -163,6 +163,7 @@ export default function ({ getService }: FtrProviderContext) {
'security-solution-ea-asset-criticality-ecs-migration',
'security:endpoint-diagnostics',
'security:endpoint-meta-telemetry',
'security:indices-metadata-telemetry',
'security:telemetry-configuration',
'security:telemetry-detection-rules',
'security:telemetry-diagnostic-timelines',

View file

@ -5,6 +5,8 @@
* 2.0.
*/
import path from 'path';
import { CA_CERT_PATH } from '@kbn/dev-utils';
import { FtrConfigProviderContext, kbnTestConfig, kibanaTestUser } from '@kbn/test';
import { services as baseServices } from './services';
@ -85,6 +87,11 @@ export function createTestConfig(options: CreateTestConfigOptions, testFiles?: s
'riskScoringPersistence',
'riskScoringRoutesEnabled',
])}`,
`--plugin-path=${path.resolve(
__dirname,
'../../../../../test/analytics/plugins/analytics_ftr_helpers'
)}`,
'--xpack.task_manager.poll_interval=1000',
`--xpack.actions.preconfigured=${JSON.stringify(PRECONFIGURED_ACTION_CONNECTORS)}`,
...(ssl

View file

@ -5,12 +5,14 @@
* 2.0.
*/
import { KibanaEBTServerProvider } from '@kbn/test-suites-src/analytics/services/kibana_ebt';
import { SecuritySolutionESSUtils } from '../services/security_solution_ess_utils';
import { SpacesServiceProvider } from '../../../common/services/spaces';
import { services as essServices } from '../../../api_integration/services';
import { SecuritySolutionESSUtils } from '../services/security_solution_ess_utils';
export const services = {
...essServices,
spaces: SpacesServiceProvider,
securitySolutionUtils: SecuritySolutionESSUtils,
kibana_ebt_server: KibanaEBTServerProvider,
};

View file

@ -4,6 +4,8 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import path from 'path';
import { FtrConfigProviderContext } from '@kbn/test';
import { services } from './services';
import { PRECONFIGURED_ACTION_CONNECTORS } from '../shared';
@ -13,6 +15,7 @@ export interface CreateTestConfigOptions {
junit: { reportName: string };
kbnTestServerArgs?: string[];
kbnTestServerEnv?: Record<string, string>;
suiteTags?: { include?: string[]; exclude?: string[] };
}
export function createTestConfig(options: CreateTestConfigOptions) {
@ -22,6 +25,7 @@ export function createTestConfig(options: CreateTestConfigOptions) {
);
return {
...svlSharedConfig.getAll(),
suiteTags: options.suiteTags,
services: {
...services,
},
@ -32,6 +36,10 @@ export function createTestConfig(options: CreateTestConfigOptions) {
'--serverless=security',
`--xpack.actions.preconfigured=${JSON.stringify(PRECONFIGURED_ACTION_CONNECTORS)}`,
...(options.kbnTestServerArgs || []),
`--plugin-path=${path.resolve(
__dirname,
'../../../../../test/analytics/plugins/analytics_ftr_helpers'
)}`,
],
env: {
...svlSharedConfig.get('kbnTestServer.env'),

View file

@ -7,6 +7,7 @@
import { BsearchSecureService } from '@kbn/test-suites-serverless/shared/services/bsearch_secure';
import { services as serverlessServices } from '@kbn/test-suites-serverless/api_integration/services';
import { KibanaEBTServerProvider } from '@kbn/test-suites-src/analytics/services/kibana_ebt';
import { SpacesServiceProvider } from '../../../common/services/spaces';
import { SecuritySolutionServerlessUtils } from '../services/security_solution_serverless_utils';
import { SecuritySolutionServerlessSuperTest } from '../services/security_solution_serverless_supertest';
@ -17,4 +18,5 @@ export const services = {
secureBsearch: BsearchSecureService,
securitySolutionUtils: SecuritySolutionServerlessUtils,
supertest: SecuritySolutionServerlessSuperTest,
kibana_ebt_server: KibanaEBTServerProvider,
};

View file

@ -47,40 +47,70 @@ export default ({ getService }: FtrProviderContext) => {
await retry.try(async () => {
const stats = await getSecurityTelemetryStats(supertest, log);
removeExtraFieldsFromTelemetryStats(stats);
expect(stats).to.eql({
detection_rules: [
[
{
name: 'security:telemetry-detection-rules',
passed: true,
},
],
expect(stats.detection_rules).to.eql([
[
{
name: 'security:telemetry-detection-rules',
passed: true,
},
],
security_lists: [
[
{
name: 'security:telemetry-lists',
passed: true,
},
],
]);
expect(stats.security_lists).to.eql([
[
{
name: 'security:telemetry-lists',
passed: true,
},
],
endpoints: [
[
{
name: 'security:endpoint-meta-telemetry',
passed: true,
},
],
]);
expect(stats.endpoints).to.eql([
[
{
name: 'security:endpoint-meta-telemetry',
passed: true,
},
],
diagnostics: [
[
{
name: 'security:endpoint-diagnostics',
passed: true,
},
],
]);
expect(stats.diagnostics).to.eql([
[
{
name: 'security:endpoint-diagnostics',
passed: true,
},
],
});
]);
expect(stats.indices_metadata).to.be.an('array');
const events = stats.indices_metadata as any[];
expect(events).to.not.be.empty();
const eventTypes = events.map((e) => e.eventType);
expect(eventTypes).to.contain('telemetry_index_stats_event');
expect(eventTypes).to.contain('telemetry_data_stream_event');
const indicesStats = events.find((e) => e.eventType === 'telemetry_index_stats_event');
expect(indicesStats).to.be.ok();
expect(indicesStats.eventData).to.be.ok();
expect(indicesStats.eventData.items).to.not.be.empty();
expect(indicesStats.eventData.items[0]).to.have.keys(
'index_name',
'query_total',
'query_time_in_millis',
'docs_count',
'docs_deleted',
'docs_total_size_in_bytes'
);
const dataStreamStats = events.find((e) => e.eventType === 'telemetry_data_stream_event');
expect(dataStreamStats).to.be.ok();
expect(dataStreamStats.eventData).to.be.ok();
expect(dataStreamStats.eventData.items).to.not.be.empty();
expect(dataStreamStats.eventData.items[0]).to.have.keys('datastream_name', 'indices');
});
});
});

View file

@ -5,19 +5,29 @@
* 2.0.
*/
import { unset } from 'lodash';
export const removeExtraFieldsFromTelemetryStats = (stats: any) => {
Object.entries(stats).forEach(([, value]: [unknown, any]) => {
value.forEach((entry: any, i: number) => {
entry.forEach((_e: any, j: number) => {
unset(value, `[${i}][${j}].time_executed_in_ms`);
unset(value, `[${i}][${j}].start_time`);
unset(value, `[${i}][${j}].end_time`);
unset(value, `[${i}][${j}].cluster_uuid`);
unset(value, `[${i}][${j}].cluster_name`);
unset(value, `[${i}][${j}].license`);
});
});
});
removeExtraFields(stats, [
'time_executed_in_ms',
'start_time',
'end_time',
'cluster_uuid',
'cluster_name',
'license',
]);
};
function removeExtraFields(obj: any, fields: string[]): void {
function traverseAndRemove(o: any): void {
if (typeof o !== 'object' || o === null) return;
for (const key in o) {
if (fields.includes(key)) {
delete o[key];
} else if (typeof o[key] === 'object') {
traverseAndRemove(o[key]);
}
}
}
traverseAndRemove(obj);
}

View file

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { FtrConfigProviderContext } from '@kbn/test';
export default async function ({ readConfigFile }: FtrConfigProviderContext) {
const functionalConfig = await readConfigFile(
require.resolve('../../../config/ess/config.base.basic')
);
return {
...functionalConfig.getAll(),
uiSettings: {},
testFiles: [require.resolve('..')],
junit: {
reportName: 'Security Solution - Telemetry Integration Tests - ESS Env',
},
};
}

View file

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { createTestConfig } from '../../../config/serverless/config.base';
export default createTestConfig({
testFiles: [require.resolve('..')],
suiteTags: { exclude: ['skipServerless'] },
junit: {
reportName: 'Security Solution - Telemetry Integration Tests - Serverless Env',
},
});

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { FtrProviderContext } from '../../ftr_provider_context';
export default ({ loadTestFile }: FtrProviderContext): void => {
describe('Security Solution - Telemetry', function () {
loadTestFile(require.resolve('./tasks/indices_metadata'));
});
};

View file

@ -0,0 +1,175 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
TELEMETRY_DATA_STREAM_EVENT,
TELEMETRY_ILM_POLICY_EVENT,
TELEMETRY_ILM_STATS_EVENT,
TELEMETRY_INDEX_STATS_EVENT,
} from '@kbn/security-solution-plugin/server/lib/telemetry/event_based/events';
import { FtrProviderContext } from '../../../ftr_provider_context';
import {
cleanupDatastreams,
cleanupPolicies,
ensureBackingIndices,
launchTask,
randomDatastream,
randomIlmPolicy,
taskHasRun,
waitFor,
} from '../../../../common/utils/security_solution';
const TASK_ID = 'security:indices-metadata-telemetry:1.0.0';
const NUM_INDICES = 5;
export default ({ getService }: FtrProviderContext) => {
const ebtServer = getService('kibana_ebt_server');
const kibanaServer = getService('kibanaServer');
const logger = getService('log');
const es = getService('es');
describe('Indices metadata task telemetry', function () {
let dsName: string;
let policyName: string;
describe('@ess @serverless indices metadata', () => {
beforeEach(async () => {
dsName = await randomDatastream(es);
await ensureBackingIndices(dsName, NUM_INDICES, es);
});
afterEach(async () => {
await cleanupDatastreams(es);
});
it('should publish data stream events', async () => {
const runAt = await launchTask(TASK_ID, kibanaServer, logger);
const opts = {
eventTypes: [TELEMETRY_DATA_STREAM_EVENT.eventType],
withTimeoutMs: 1000,
fromTimestamp: new Date().toISOString(),
};
await waitFor(
async () => {
const events = await ebtServer
.getEvents(Number.MAX_SAFE_INTEGER, opts)
.then((result) => result.map((ev) => ev.properties.items))
.then((result) => result.flat())
.then((result) => result.filter((ev) => (ev as any).datastream_name === dsName));
const hasRun = await taskHasRun(TASK_ID, kibanaServer, runAt);
const eventCount = events.length;
return hasRun && eventCount === 1;
},
'waitForTaskToRun',
logger
);
});
it('should publish index stats events', async () => {
const runAt = await launchTask(TASK_ID, kibanaServer, logger);
const opts = {
eventTypes: [TELEMETRY_INDEX_STATS_EVENT.eventType],
withTimeoutMs: 1000,
fromTimestamp: new Date().toISOString(),
};
// .ds-<ds-name>-YYYY.MM.DD-NNNNNN
const regex = new RegExp(`^\.ds-${dsName}-\\d{4}.\\d{2}.\\d{2}-\\d{6}$`);
await waitFor(
async () => {
const events = await ebtServer
.getEvents(Number.MAX_SAFE_INTEGER, opts)
.then((result) => result.map((ev) => ev.properties.items))
.then((result) => result.flat())
.then((result) =>
result.filter((ev) => regex.test((ev as any).index_name as string))
);
const hasRun = await taskHasRun(TASK_ID, kibanaServer, runAt);
return hasRun && events.length === NUM_INDICES;
},
'waitForTaskToRun',
logger
);
});
});
describe('@ess indices metadata', function () {
this.tags('skipServerless');
beforeEach(async () => {
policyName = await randomIlmPolicy(es);
dsName = await randomDatastream(es, policyName);
await ensureBackingIndices(dsName, NUM_INDICES, es);
});
afterEach(async () => {
await cleanupDatastreams(es);
await cleanupPolicies(es);
});
it('should publish ilm policy events', async () => {
const runAt = await launchTask(TASK_ID, kibanaServer, logger);
const opts = {
eventTypes: [TELEMETRY_ILM_POLICY_EVENT.eventType],
withTimeoutMs: 1000,
fromTimestamp: new Date().toISOString(),
};
await waitFor(
async () => {
const events = await ebtServer
.getEvents(Number.MAX_SAFE_INTEGER, opts)
.then((result) => result.map((ev) => ev.properties.items))
.then((result) => result.flat())
.then((result) => result.filter((ev) => (ev as any).policy_name === policyName));
const hasRun = await taskHasRun(TASK_ID, kibanaServer, runAt);
return hasRun && events.length === 1;
},
'waitForTaskToRun',
logger
);
});
it('should publish ilm stats events', async () => {
const runAt = await launchTask(TASK_ID, kibanaServer, logger);
const opts = {
eventTypes: [TELEMETRY_ILM_STATS_EVENT.eventType],
withTimeoutMs: 1000,
fromTimestamp: new Date().toISOString(),
};
await waitFor(
async () => {
const events = await ebtServer
.getEvents(Number.MAX_SAFE_INTEGER, opts)
.then((result) => result.map((ev) => ev.properties.items))
.then((result) => result.flat())
.then((result) => result.filter((ev) => (ev as any).policy_name === policyName));
const hasRun = await taskHasRun(TASK_ID, kibanaServer, runAt);
return hasRun && events.length === NUM_INDICES;
},
'waitForTaskToRun',
logger
);
});
});
});
};

View file

@ -52,5 +52,6 @@
"@kbn/ftr-common-functional-ui-services",
"@kbn/spaces-plugin",
"@kbn/elastic-assistant-plugin",
"@kbn/test-suites-src",
]
}