mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
[Telemetry][Security Solution] Index metadata collector (#194004)
## Summary Implements a security_solution task scheduled to run once a day to collect the following information: 1. Datastreams stats 2. Indices stats 3. ILMs stats 4. ILM configs The task allows a runtime configuration to limit the number of indices and data streams to analyze or event to disable the feature entirely. Once the data is gathered, the task sends it as EBT events. --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
parent
2fd89943c5
commit
36b344a4c5
43 changed files with 3087 additions and 136 deletions
|
@ -89,6 +89,7 @@ enabled:
|
|||
- x-pack/test/security_solution_api_integration/test_suites/detections_response/rules_management/rule_read/trial_license_complete_tier/configs/serverless.config.ts
|
||||
- x-pack/test/security_solution_api_integration/test_suites/detections_response/rules_management/rule_read/basic_license_essentials_tier/configs/serverless.config.ts
|
||||
- x-pack/test/security_solution_api_integration/test_suites/detections_response/telemetry/trial_license_complete_tier/configs/serverless.config.ts
|
||||
- x-pack/test/security_solution_api_integration/test_suites/telemetry/configs/serverless.config.ts
|
||||
- x-pack/test/security_solution_api_integration/test_suites/detections_response/user_roles/trial_license_complete_tier/configs/serverless.config.ts
|
||||
- x-pack/test/security_solution_api_integration/test_suites/genai/nlp_cleanup_task/trial_license_complete_tier/configs/serverless.config.ts
|
||||
- x-pack/test/security_solution_api_integration/test_suites/genai/nlp_cleanup_task/basic_license_essentials_tier/configs/serverless.config.ts
|
||||
|
|
|
@ -70,6 +70,7 @@ enabled:
|
|||
- x-pack/test/security_solution_api_integration/test_suites/detections_response/rules_management/rule_read/trial_license_complete_tier/configs/ess.config.ts
|
||||
- x-pack/test/security_solution_api_integration/test_suites/detections_response/rules_management/rule_read/basic_license_essentials_tier/configs/ess.config.ts
|
||||
- x-pack/test/security_solution_api_integration/test_suites/detections_response/telemetry/trial_license_complete_tier/configs/ess.config.ts
|
||||
- x-pack/test/security_solution_api_integration/test_suites/telemetry/configs/ess.config.ts
|
||||
- x-pack/test/security_solution_api_integration/test_suites/detections_response/user_roles/trial_license_complete_tier/configs/ess.config.ts
|
||||
- x-pack/test/security_solution_api_integration/test_suites/entity_analytics/risk_engine/trial_license_complete_tier/configs/ess.config.ts
|
||||
- x-pack/test/security_solution_api_integration/test_suites/entity_analytics/risk_engine/basic_license_essentials_tier/configs/ess.config.ts
|
||||
|
|
|
@ -0,0 +1,152 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
import Path from 'path';
|
||||
import axios from 'axios';
|
||||
|
||||
import { cloneDeep } from 'lodash';
|
||||
|
||||
import { telemetryConfiguration } from '../lib/telemetry/configuration';
|
||||
import {
|
||||
TaskManagerPlugin,
|
||||
type TaskManagerStartContract,
|
||||
} from '@kbn/task-manager-plugin/server/plugin';
|
||||
|
||||
import {
|
||||
setupTestServers,
|
||||
removeFile,
|
||||
mockAxiosPost,
|
||||
DEFAULT_GET_ROUTES,
|
||||
mockAxiosGet,
|
||||
getRandomInt,
|
||||
} from './lib/helpers';
|
||||
|
||||
import {
|
||||
type TestElasticsearchUtils,
|
||||
type TestKibanaUtils,
|
||||
} from '@kbn/core-test-helpers-kbn-server';
|
||||
import { Plugin as SecuritySolutionPlugin } from '../plugin';
|
||||
import { getTelemetryTasks, runSoonConfigTask } from './lib/telemetry_helpers';
|
||||
import type { SecurityTelemetryTask } from '../lib/telemetry/task';
|
||||
|
||||
jest.mock('axios');
|
||||
|
||||
const logFilePath = Path.join(__dirname, 'config.logs.log');
|
||||
const taskManagerStartSpy = jest.spyOn(TaskManagerPlugin.prototype, 'start');
|
||||
const securitySolutionStartSpy = jest.spyOn(SecuritySolutionPlugin.prototype, 'start');
|
||||
|
||||
const mockedAxiosGet = jest.spyOn(axios, 'get');
|
||||
const mockedAxiosPost = jest.spyOn(axios, 'post');
|
||||
|
||||
const securitySolutionPlugin = jest.spyOn(SecuritySolutionPlugin.prototype, 'start');
|
||||
|
||||
describe('configuration', () => {
|
||||
let esServer: TestElasticsearchUtils;
|
||||
let kibanaServer: TestKibanaUtils;
|
||||
let taskManagerPlugin: TaskManagerStartContract;
|
||||
let tasks: SecurityTelemetryTask[];
|
||||
|
||||
beforeAll(async () => {
|
||||
await removeFile(logFilePath);
|
||||
|
||||
const servers = await setupTestServers(logFilePath);
|
||||
|
||||
esServer = servers.esServer;
|
||||
kibanaServer = servers.kibanaServer;
|
||||
|
||||
expect(taskManagerStartSpy).toHaveBeenCalledTimes(1);
|
||||
taskManagerPlugin = taskManagerStartSpy.mock.results[0].value;
|
||||
|
||||
expect(securitySolutionStartSpy).toHaveBeenCalledTimes(1);
|
||||
|
||||
tasks = getTelemetryTasks(securitySolutionStartSpy);
|
||||
|
||||
expect(securitySolutionPlugin).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
if (kibanaServer) {
|
||||
await kibanaServer.stop();
|
||||
}
|
||||
if (esServer) {
|
||||
await esServer.stop();
|
||||
}
|
||||
});
|
||||
|
||||
beforeEach(async () => {
|
||||
jest.clearAllMocks();
|
||||
mockAxiosPost(mockedAxiosPost);
|
||||
});
|
||||
|
||||
afterEach(async () => {});
|
||||
|
||||
describe('configuration task', () => {
|
||||
it('should keep default values when no new config was provided', async () => {
|
||||
const before = cloneDeep(telemetryConfiguration);
|
||||
|
||||
await runSoonConfigTask(tasks, taskManagerPlugin);
|
||||
|
||||
expect(telemetryConfiguration).toEqual(before);
|
||||
});
|
||||
|
||||
it('should update values with new manifest', async () => {
|
||||
const expected = {
|
||||
telemetry_max_buffer_size: getRandomInt(1, 100),
|
||||
max_security_list_telemetry_batch: getRandomInt(1, 100),
|
||||
max_endpoint_telemetry_batch: getRandomInt(1, 100),
|
||||
max_detection_rule_telemetry_batch: getRandomInt(1, 100),
|
||||
max_detection_alerts_batch: getRandomInt(1, 100),
|
||||
use_async_sender: true,
|
||||
pagination_config: {
|
||||
max_page_size_bytes: getRandomInt(1, 100),
|
||||
num_docs_to_sample: getRandomInt(1, 100),
|
||||
},
|
||||
sender_channels: {
|
||||
default: {
|
||||
buffer_time_span_millis: getRandomInt(1, 100),
|
||||
inflight_events_threshold: getRandomInt(1, 100),
|
||||
max_payload_size_bytes: getRandomInt(1, 100),
|
||||
},
|
||||
},
|
||||
indices_metadata_config: {
|
||||
indices_threshold: getRandomInt(1, 100),
|
||||
datastreams_threshold: getRandomInt(1, 100),
|
||||
max_prefixes: getRandomInt(1, 100),
|
||||
max_group_size: getRandomInt(1, 100),
|
||||
},
|
||||
};
|
||||
|
||||
mockAxiosGet(mockedAxiosGet, [
|
||||
...DEFAULT_GET_ROUTES,
|
||||
[/.*telemetry-buffer-and-batch-sizes-v1.*/, { status: 200, data: cloneDeep(expected) }],
|
||||
]);
|
||||
|
||||
await runSoonConfigTask(tasks, taskManagerPlugin);
|
||||
|
||||
expect(telemetryConfiguration.telemetry_max_buffer_size).toEqual(
|
||||
expected.telemetry_max_buffer_size
|
||||
);
|
||||
expect(telemetryConfiguration.max_security_list_telemetry_batch).toEqual(
|
||||
expected.max_security_list_telemetry_batch
|
||||
);
|
||||
expect(telemetryConfiguration.max_endpoint_telemetry_batch).toEqual(
|
||||
expected.max_endpoint_telemetry_batch
|
||||
);
|
||||
expect(telemetryConfiguration.max_detection_rule_telemetry_batch).toEqual(
|
||||
expected.max_detection_rule_telemetry_batch
|
||||
);
|
||||
expect(telemetryConfiguration.max_detection_alerts_batch).toEqual(
|
||||
expected.max_detection_alerts_batch
|
||||
);
|
||||
expect(telemetryConfiguration.use_async_sender).toEqual(expected.use_async_sender);
|
||||
expect(telemetryConfiguration.sender_channels).toEqual(expected.sender_channels);
|
||||
expect(telemetryConfiguration.pagination_config).toEqual(expected.pagination_config);
|
||||
expect(telemetryConfiguration.indices_metadata_config).toEqual(
|
||||
expected.indices_metadata_config
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -10,6 +10,20 @@ import Util from 'util';
|
|||
import type { ElasticsearchClient } from '@kbn/core/server';
|
||||
import deepmerge from 'deepmerge';
|
||||
import { createTestServers, createRootWithCorePlugins } from '@kbn/core-test-helpers-kbn-server';
|
||||
|
||||
export const DEFAULT_GET_ROUTES: Array<[RegExp, unknown]> = [
|
||||
[new RegExp('.*/ping$'), { status: 200 }],
|
||||
[
|
||||
/.*kibana\/manifest\/artifacts.*/,
|
||||
{
|
||||
status: 200,
|
||||
data: 'x-pack/plugins/security_solution/server/lib/telemetry/__mocks__/kibana-artifacts.zip',
|
||||
},
|
||||
],
|
||||
];
|
||||
|
||||
export const DEFAULT_POST_ROUTES: Array<[RegExp, unknown]> = [[/.*/, { status: 200 }]];
|
||||
|
||||
const asyncUnlink = Util.promisify(Fs.unlink);
|
||||
|
||||
/**
|
||||
|
@ -127,3 +141,35 @@ export function updateTimestamps(data: object[]): object[] {
|
|||
return { ...d, '@timestamp': new Date(currentTimeMillis + (i + 1) * 100) };
|
||||
});
|
||||
}
|
||||
|
||||
export function mockAxiosPost(
|
||||
postSpy: jest.SpyInstance,
|
||||
routes: Array<[RegExp, unknown]> = DEFAULT_POST_ROUTES
|
||||
) {
|
||||
postSpy.mockImplementation(async (url: string) => {
|
||||
for (const [route, returnValue] of routes) {
|
||||
if (route.test(url)) {
|
||||
return returnValue;
|
||||
}
|
||||
}
|
||||
return { status: 404 };
|
||||
});
|
||||
}
|
||||
|
||||
export function mockAxiosGet(
|
||||
getSpy: jest.SpyInstance,
|
||||
routes: Array<[RegExp, unknown]> = DEFAULT_GET_ROUTES
|
||||
) {
|
||||
getSpy.mockImplementation(async (url: string) => {
|
||||
for (const [route, returnValue] of routes) {
|
||||
if (route.test(url)) {
|
||||
return returnValue;
|
||||
}
|
||||
}
|
||||
return { status: 404 };
|
||||
});
|
||||
}
|
||||
|
||||
export function getRandomInt(min: number, max: number): number {
|
||||
return Math.floor(Math.random() * (max - min + 1)) + min;
|
||||
}
|
||||
|
|
|
@ -25,13 +25,13 @@ import {
|
|||
deleteExceptionListItem,
|
||||
} from '@kbn/lists-plugin/server/services/exception_lists';
|
||||
import { LEGACY_AGENT_POLICY_SAVED_OBJECT_TYPE } from '@kbn/fleet-plugin/common/constants';
|
||||
import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server';
|
||||
|
||||
import { packagePolicyService } from '@kbn/fleet-plugin/server/services';
|
||||
|
||||
import { ENDPOINT_ARTIFACT_LISTS } from '@kbn/securitysolution-list-constants';
|
||||
import { DETECTION_TYPE, NAMESPACE_TYPE } from '@kbn/lists-plugin/common/constants.mock';
|
||||
import { DEFAULT_DIAGNOSTIC_INDEX_PATTERN } from '../../../common/endpoint/constants';
|
||||
import { bulkInsert, updateTimestamps } from './helpers';
|
||||
import { bulkInsert, eventually, updateTimestamps } from './helpers';
|
||||
import { TelemetryEventsSender } from '../../lib/telemetry/sender';
|
||||
import type {
|
||||
SecuritySolutionPluginStart,
|
||||
|
@ -41,6 +41,7 @@ import type { SecurityTelemetryTask } from '../../lib/telemetry/task';
|
|||
import { Plugin as SecuritySolutionPlugin } from '../../plugin';
|
||||
import { AsyncTelemetryEventsSender } from '../../lib/telemetry/async_sender';
|
||||
import { type ITelemetryReceiver, TelemetryReceiver } from '../../lib/telemetry/receiver';
|
||||
import { DEFAULT_DIAGNOSTIC_INDEX_PATTERN } from '../../../common/endpoint/constants';
|
||||
import mockEndpointAlert from '../__mocks__/endpoint-alert.json';
|
||||
import mockedRule from '../__mocks__/rule.json';
|
||||
import fleetAgents from '../__mocks__/fleet-agents.json';
|
||||
|
@ -417,3 +418,24 @@ export function getTelemetryTaskType(task: SecurityTelemetryTask): string {
|
|||
return '';
|
||||
}
|
||||
}
|
||||
|
||||
export async function runSoonConfigTask(
|
||||
tasks: SecurityTelemetryTask[],
|
||||
taskManagerPlugin: TaskManagerStartContract
|
||||
) {
|
||||
const configTaskType = 'security:telemetry-configuration';
|
||||
const configTask = getTelemetryTask(tasks, configTaskType);
|
||||
const runAfter = new Date();
|
||||
await eventually(async () => {
|
||||
await taskManagerPlugin.runSoon(configTask.getTaskId());
|
||||
});
|
||||
|
||||
// wait until the task finishes
|
||||
await eventually(async () => {
|
||||
const hasRun = await taskManagerPlugin
|
||||
.get(configTask.getTaskId())
|
||||
.then((t) => new Date(t.state.lastExecutionTimestamp) > runAfter)
|
||||
.catch(() => false);
|
||||
expect(hasRun).toBe(true);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -20,7 +20,14 @@ import {
|
|||
TELEMETRY_CHANNEL_ENDPOINT_META,
|
||||
} from '../lib/telemetry/constants';
|
||||
|
||||
import { eventually, setupTestServers, removeFile } from './lib/helpers';
|
||||
import {
|
||||
eventually,
|
||||
setupTestServers,
|
||||
removeFile,
|
||||
mockAxiosGet,
|
||||
mockAxiosPost,
|
||||
DEFAULT_GET_ROUTES,
|
||||
} from './lib/helpers';
|
||||
import {
|
||||
cleanupMockedAlerts,
|
||||
cleanupMockedExceptionLists,
|
||||
|
@ -37,6 +44,7 @@ import {
|
|||
mockEndpointData,
|
||||
getTelemetryReceiver,
|
||||
mockPrebuiltRulesData,
|
||||
runSoonConfigTask,
|
||||
} from './lib/telemetry_helpers';
|
||||
|
||||
import {
|
||||
|
@ -99,6 +107,7 @@ describe('telemetry tasks', () => {
|
|||
},
|
||||
},
|
||||
});
|
||||
|
||||
esServer = servers.esServer;
|
||||
kibanaServer = servers.kibanaServer;
|
||||
|
||||
|
@ -134,7 +143,17 @@ describe('telemetry tasks', () => {
|
|||
|
||||
beforeEach(async () => {
|
||||
jest.clearAllMocks();
|
||||
mockAxiosGet();
|
||||
mockAxiosPost(mockedAxiosPost);
|
||||
mockAxiosGet(mockedAxiosGet, [
|
||||
...DEFAULT_GET_ROUTES,
|
||||
[
|
||||
/.*telemetry-buffer-and-batch-sizes-v1.*/,
|
||||
{
|
||||
status: 200,
|
||||
data: fakeBufferAndSizesConfigAsyncDisabled,
|
||||
},
|
||||
],
|
||||
]);
|
||||
deferred = [];
|
||||
});
|
||||
|
||||
|
@ -208,21 +227,17 @@ describe('telemetry tasks', () => {
|
|||
});
|
||||
|
||||
it('should use new sender when configured', async () => {
|
||||
const configTaskType = 'security:telemetry-configuration';
|
||||
const configTask = getTelemetryTask(tasks, configTaskType);
|
||||
mockAxiosPost(mockedAxiosPost);
|
||||
|
||||
mockAxiosGet(fakeBufferAndSizesConfigAsyncEnabled);
|
||||
await eventually(async () => {
|
||||
await taskManagerPlugin.runSoon(configTask.getTaskId());
|
||||
});
|
||||
mockAxiosGet(mockedAxiosGet, [
|
||||
...DEFAULT_GET_ROUTES,
|
||||
[
|
||||
/.*telemetry-buffer-and-batch-sizes-v1.*/,
|
||||
{ status: 200, data: fakeBufferAndSizesConfigAsyncEnabled },
|
||||
],
|
||||
]);
|
||||
|
||||
// wait until the task finishes
|
||||
await eventually(async () => {
|
||||
const found = (await taskManagerPlugin.fetch()).docs.find(
|
||||
(t) => t.taskType === configTaskType
|
||||
);
|
||||
expect(found).toBeFalsy();
|
||||
});
|
||||
await runSoonConfigTask(tasks, taskManagerPlugin);
|
||||
|
||||
const [task, started] = await mockAndScheduleDetectionRulesTask();
|
||||
|
||||
|
@ -238,13 +253,20 @@ describe('telemetry tasks', () => {
|
|||
|
||||
it('should update sender queue config', async () => {
|
||||
const expectedConfig = fakeBufferAndSizesConfigWithQueues.sender_channels['task-metrics'];
|
||||
const configTaskType = 'security:telemetry-configuration';
|
||||
const configTask = getTelemetryTask(tasks, configTaskType);
|
||||
|
||||
mockAxiosGet(fakeBufferAndSizesConfigWithQueues);
|
||||
await eventually(async () => {
|
||||
await taskManagerPlugin.runSoon(configTask.getTaskId());
|
||||
});
|
||||
mockAxiosPost(mockedAxiosPost);
|
||||
mockAxiosGet(mockedAxiosGet, [
|
||||
...DEFAULT_GET_ROUTES,
|
||||
[
|
||||
/.*telemetry-buffer-and-batch-sizes-v1.*/,
|
||||
{
|
||||
status: 200,
|
||||
data: fakeBufferAndSizesConfigWithQueues,
|
||||
},
|
||||
],
|
||||
]);
|
||||
|
||||
await runSoonConfigTask(tasks, taskManagerPlugin);
|
||||
|
||||
await eventually(async () => {
|
||||
/* eslint-disable dot-notation */
|
||||
|
@ -838,31 +860,6 @@ describe('telemetry tasks', () => {
|
|||
});
|
||||
}
|
||||
|
||||
function mockAxiosGet(bufferConfig: unknown = fakeBufferAndSizesConfigAsyncDisabled) {
|
||||
mockedAxiosPost.mockImplementation(
|
||||
async (_url: string, _data?: unknown, _config?: AxiosRequestConfig<unknown> | undefined) => {
|
||||
return { status: 200 };
|
||||
}
|
||||
);
|
||||
|
||||
mockedAxiosGet.mockImplementation(async (url: string) => {
|
||||
if (url.startsWith(ENDPOINT_STAGING) && url.endsWith('ping')) {
|
||||
return { status: 200 };
|
||||
} else if (url.indexOf('kibana/manifest/artifacts') !== -1) {
|
||||
return {
|
||||
status: 200,
|
||||
data: 'x-pack/plugins/security_solution/server/lib/telemetry/__mocks__/kibana-artifacts.zip',
|
||||
};
|
||||
} else if (url.indexOf('telemetry-buffer-and-batch-sizes-v1') !== -1) {
|
||||
return {
|
||||
status: 200,
|
||||
data: bufferConfig,
|
||||
};
|
||||
}
|
||||
return { status: 404 };
|
||||
});
|
||||
}
|
||||
|
||||
async function getTaskMetricsRequests(
|
||||
task: SecurityTelemetryTask,
|
||||
olderThan: number
|
||||
|
|
|
@ -15,6 +15,7 @@ import { getDetectionRulesPreview } from './utils/get_detecton_rules_preview';
|
|||
import { getSecurityListsPreview } from './utils/get_security_lists_preview';
|
||||
import { getEndpointPreview } from './utils/get_endpoint_preview';
|
||||
import { getDiagnosticsPreview } from './utils/get_diagnostics_preview';
|
||||
import { getIndicesMetadataPreview } from './utils/get_indices_metadata_preview';
|
||||
|
||||
export const telemetryDetectionRulesPreviewRoute = (
|
||||
router: SecuritySolutionPluginRouter,
|
||||
|
@ -62,12 +63,19 @@ export const telemetryDetectionRulesPreviewRoute = (
|
|||
telemetrySender,
|
||||
});
|
||||
|
||||
const indicesMetadata = await getIndicesMetadataPreview({
|
||||
logger,
|
||||
telemetryReceiver,
|
||||
telemetrySender,
|
||||
});
|
||||
|
||||
return response.ok({
|
||||
body: {
|
||||
detection_rules: detectionRules,
|
||||
security_lists: securityLists,
|
||||
endpoints,
|
||||
diagnostics,
|
||||
indices_metadata: indicesMetadata,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { Logger } from '@kbn/core/server';
|
||||
|
||||
import { PreviewTelemetryEventsSender } from '../../../../telemetry/preview_sender';
|
||||
import type { ITelemetryReceiver } from '../../../../telemetry/receiver';
|
||||
import { PreviewTaskMetricsService } from '../../../../telemetry/preview_task_metrics';
|
||||
import type { ITelemetryEventsSender } from '../../../../telemetry/sender';
|
||||
import { createTelemetryIndicesMetadataTaskConfig } from '../../../../telemetry/tasks/indices.metadata';
|
||||
|
||||
export const getIndicesMetadataPreview = async ({
|
||||
logger,
|
||||
telemetryReceiver,
|
||||
telemetrySender,
|
||||
}: {
|
||||
logger: Logger;
|
||||
telemetryReceiver: ITelemetryReceiver;
|
||||
telemetrySender: ITelemetryEventsSender;
|
||||
}): Promise<Array<{ eventType: string; eventData: object }>> => {
|
||||
const taskExecutionPeriod = {
|
||||
last: new Date(0).toISOString(),
|
||||
current: new Date().toISOString(),
|
||||
};
|
||||
|
||||
const taskSender = new PreviewTelemetryEventsSender(logger, telemetrySender);
|
||||
const taskMetricsService = new PreviewTaskMetricsService(logger, taskSender);
|
||||
const task = createTelemetryIndicesMetadataTaskConfig();
|
||||
await task.runTask(
|
||||
'indices-metadata-telemetry',
|
||||
logger,
|
||||
telemetryReceiver,
|
||||
taskSender,
|
||||
taskMetricsService,
|
||||
taskExecutionPeriod
|
||||
);
|
||||
return taskSender.getEbtEventsSent();
|
||||
};
|
File diff suppressed because it is too large
Load diff
|
@ -8,7 +8,7 @@ import axios from 'axios';
|
|||
import * as rx from 'rxjs';
|
||||
import _, { cloneDeep } from 'lodash';
|
||||
|
||||
import type { Logger, LogMeta } from '@kbn/core/server';
|
||||
import type { AnalyticsServiceSetup, EventTypeOpts, Logger, LogMeta } from '@kbn/core/server';
|
||||
import type { TelemetryPluginSetup, TelemetryPluginStart } from '@kbn/telemetry-plugin/server';
|
||||
import { type IUsageCounter } from '@kbn/usage-collection-plugin/server/usage_counters/usage_counter';
|
||||
import type { ITelemetryReceiver } from './receiver';
|
||||
|
@ -17,7 +17,7 @@ import {
|
|||
type QueueConfig,
|
||||
type RetryConfig,
|
||||
} from './async_sender.types';
|
||||
import { TelemetryChannel, TelemetryCounter } from './types';
|
||||
import { type Nullable, TelemetryChannel, TelemetryCounter } from './types';
|
||||
import * as collections from './collections_helpers';
|
||||
import { CachedSubject, retryOnError$ } from './rxjs_helpers';
|
||||
import { SenderUtils } from './sender_helpers';
|
||||
|
@ -55,6 +55,8 @@ export class AsyncTelemetryEventsSender implements IAsyncTelemetryEventsSender {
|
|||
private telemetryUsageCounter?: IUsageCounter;
|
||||
private senderUtils: SenderUtils | undefined;
|
||||
|
||||
private analytics: Nullable<AnalyticsServiceSetup>;
|
||||
|
||||
constructor(logger: Logger) {
|
||||
this.logger = newTelemetryLogger(logger.get('telemetry_events.async_sender'));
|
||||
}
|
||||
|
@ -64,7 +66,8 @@ export class AsyncTelemetryEventsSender implements IAsyncTelemetryEventsSender {
|
|||
fallbackQueueConfig: QueueConfig,
|
||||
telemetryReceiver: ITelemetryReceiver,
|
||||
telemetrySetup?: TelemetryPluginSetup,
|
||||
telemetryUsageCounter?: IUsageCounter
|
||||
telemetryUsageCounter?: IUsageCounter,
|
||||
analytics?: AnalyticsServiceSetup
|
||||
): void {
|
||||
this.logger.l(`Setting up ${AsyncTelemetryEventsSender.name}`);
|
||||
|
||||
|
@ -77,6 +80,7 @@ export class AsyncTelemetryEventsSender implements IAsyncTelemetryEventsSender {
|
|||
this.telemetryReceiver = telemetryReceiver;
|
||||
this.telemetrySetup = telemetrySetup;
|
||||
this.telemetryUsageCounter = telemetryUsageCounter;
|
||||
this.analytics = analytics;
|
||||
|
||||
this.updateStatus(ServiceStatus.CONFIGURED);
|
||||
}
|
||||
|
@ -201,6 +205,13 @@ export class AsyncTelemetryEventsSender implements IAsyncTelemetryEventsSender {
|
|||
}
|
||||
}
|
||||
|
||||
public reportEBT<T>(eventTypeOpts: EventTypeOpts<T>, eventData: T): void {
|
||||
if (!this.analytics) {
|
||||
throw Error('analytics is unavailable');
|
||||
}
|
||||
this.analytics.reportEvent(eventTypeOpts.eventType, eventData as object);
|
||||
}
|
||||
|
||||
// internal methods
|
||||
private queue$(
|
||||
upstream$: rx.Observable<Event>,
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
*/
|
||||
import type { TelemetryPluginSetup, TelemetryPluginStart } from '@kbn/telemetry-plugin/server';
|
||||
import { type IUsageCounter } from '@kbn/usage-collection-plugin/server/usage_counters/usage_counter';
|
||||
import type { AnalyticsServiceSetup, EventTypeOpts } from '@kbn/core-analytics-server';
|
||||
import { type TelemetryChannel } from './types';
|
||||
import type { ITelemetryReceiver } from './receiver';
|
||||
|
||||
|
@ -20,7 +21,8 @@ export interface IAsyncTelemetryEventsSender {
|
|||
fallbackQueueConfig: QueueConfig,
|
||||
telemetryReceiver: ITelemetryReceiver,
|
||||
telemetrySetup?: TelemetryPluginSetup,
|
||||
telemetryUsageCounter?: IUsageCounter
|
||||
telemetryUsageCounter?: IUsageCounter,
|
||||
analytics?: AnalyticsServiceSetup
|
||||
) => void;
|
||||
start: (telemetryStart?: TelemetryPluginStart) => void;
|
||||
stop: () => Promise<void>;
|
||||
|
@ -28,6 +30,7 @@ export interface IAsyncTelemetryEventsSender {
|
|||
simulateSend: (channel: TelemetryChannel, events: unknown[]) => string[];
|
||||
updateQueueConfig: (channel: TelemetryChannel, config: QueueConfig) => void;
|
||||
updateDefaultQueueConfig: (config: QueueConfig) => void;
|
||||
reportEBT: <T>(eventTypeOpts: EventTypeOpts<T>, eventData: T) => void;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -4,7 +4,14 @@
|
|||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
import { chunked, chunkedBy } from './collections_helpers';
|
||||
import { stagingIndices } from './__mocks__/staging_indices';
|
||||
import {
|
||||
type CommonPrefixesConfig,
|
||||
chunked,
|
||||
chunkedBy,
|
||||
findCommonPrefixes,
|
||||
chunkStringsByMaxLength,
|
||||
} from './collections_helpers';
|
||||
|
||||
describe('telemetry.utils.chunked', () => {
|
||||
it('should chunk simple case', async () => {
|
||||
|
@ -79,3 +86,105 @@ describe('telemetry.utils.chunkedBy', () => {
|
|||
expect(output).toEqual([['aaaa']]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('telemetry.utils.findCommonPrefixes', () => {
|
||||
it('should find common prefixes in simple case', async () => {
|
||||
const indices = ['aaa', 'b', 'aa'];
|
||||
const config: CommonPrefixesConfig = {
|
||||
maxPrefixes: 10,
|
||||
maxGroupSize: 10,
|
||||
minPrefixSize: 1,
|
||||
};
|
||||
|
||||
const output = findCommonPrefixes(indices, config);
|
||||
|
||||
expect(output).toHaveLength(2);
|
||||
expect(output.find((v, _) => v.parts.length === 1 && v.parts[0] === 'a')?.indexCount).toEqual(
|
||||
2
|
||||
);
|
||||
expect(output.find((v, _) => v.parts.length === 1 && v.parts[0] === 'b')?.indexCount).toEqual(
|
||||
1
|
||||
);
|
||||
});
|
||||
|
||||
it('should find common prefixes with different minPrefixSize', async () => {
|
||||
const indices = ['.ds-AA-0001', '.ds-AA-0002', '.ds-BB-0003'];
|
||||
const config: CommonPrefixesConfig = {
|
||||
maxPrefixes: 10,
|
||||
maxGroupSize: 3,
|
||||
minPrefixSize: 5,
|
||||
};
|
||||
|
||||
const output = findCommonPrefixes(indices, config);
|
||||
|
||||
expect(output).toHaveLength(2);
|
||||
expect(
|
||||
output.find((v, _) => v.parts.length === 1 && v.parts[0] === '.ds-A')?.indexCount
|
||||
).toEqual(2);
|
||||
expect(
|
||||
output.find((v, _) => v.parts.length === 1 && v.parts[0] === '.ds-B')?.indexCount
|
||||
).toEqual(1);
|
||||
});
|
||||
|
||||
it('should discard extra indices', async () => {
|
||||
const indices = ['aaa', 'aaaaaa', 'aa'];
|
||||
const config: CommonPrefixesConfig = {
|
||||
maxPrefixes: 1,
|
||||
maxGroupSize: 2,
|
||||
minPrefixSize: 3,
|
||||
};
|
||||
|
||||
const output = findCommonPrefixes(indices, config);
|
||||
|
||||
expect(output).toHaveLength(1);
|
||||
expect(output.find((v, _) => v.parts.length === 1 && v.parts[0] === 'aaa')?.indexCount).toEqual(
|
||||
2
|
||||
);
|
||||
});
|
||||
|
||||
it('should group many indices', async () => {
|
||||
const indices = stagingIndices;
|
||||
const config: CommonPrefixesConfig = {
|
||||
maxPrefixes: 8,
|
||||
maxGroupSize: 100,
|
||||
minPrefixSize: 3,
|
||||
};
|
||||
|
||||
const output = findCommonPrefixes(indices, config);
|
||||
|
||||
expect(output).toHaveLength(config.maxPrefixes);
|
||||
expect(output.map((v, _) => v.indexCount).reduce((acc, i) => acc + i, 0)).toBe(indices.length);
|
||||
});
|
||||
});
|
||||
|
||||
describe('telemetry.utils.splitIndicesByNameLength', () => {
|
||||
it('should chunk simple case', async () => {
|
||||
const input = ['aa', 'b', 'ccc', 'ddd'];
|
||||
const output = chunkStringsByMaxLength(input, 5);
|
||||
expect(output).toEqual([['aa', 'b'], ['ccc'], ['ddd']]);
|
||||
});
|
||||
|
||||
it('should chunk with remainder', async () => {
|
||||
const input = ['aaa', 'b'];
|
||||
const output = chunkStringsByMaxLength(input, 10);
|
||||
expect(output).toEqual([['aaa', 'b']]);
|
||||
});
|
||||
|
||||
it('should chunk with empty list', async () => {
|
||||
const input: string[] = [];
|
||||
const output = chunkStringsByMaxLength(input, 3);
|
||||
expect(output).toEqual([]);
|
||||
});
|
||||
|
||||
it('should chunk with single element smaller than max weight', async () => {
|
||||
const input = ['aa'];
|
||||
const output = chunkStringsByMaxLength(input, 3);
|
||||
expect(output).toEqual([['aa']]);
|
||||
});
|
||||
|
||||
it('should chunk with single element bigger than max weight', async () => {
|
||||
const input = ['aaaa', 'bb'];
|
||||
const output = chunkStringsByMaxLength(input, 4);
|
||||
expect(output).toEqual([['aaaa'], ['bb']]);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -63,3 +63,153 @@ class Chunked<T> {
|
|||
return this.chunks.filter((chunk) => chunk.length > 0);
|
||||
}
|
||||
}
|
||||
|
||||
export interface CommonPrefixesConfig {
|
||||
maxPrefixes: number;
|
||||
maxGroupSize: number;
|
||||
minPrefixSize: number;
|
||||
}
|
||||
|
||||
interface TrieNode {
|
||||
char: string;
|
||||
prefix: string;
|
||||
children: { [key: string]: TrieNode };
|
||||
count: number;
|
||||
isEnd: boolean;
|
||||
id: number;
|
||||
}
|
||||
|
||||
interface Group {
|
||||
parts: string[];
|
||||
indexCount: number;
|
||||
}
|
||||
|
||||
function newTrieNode(char: string = '', prefix: string = '', id: number = 0): TrieNode {
|
||||
return {
|
||||
char,
|
||||
children: {},
|
||||
count: 0,
|
||||
id,
|
||||
isEnd: false,
|
||||
prefix,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds and groups common prefixes from a list of strings.
|
||||
*
|
||||
* @param {string[]} indices - An array of strings from which common prefixes will be extracted.
|
||||
* @param {CommonPrefixesConfig} config - A configuration object that defines the rules for grouping.
|
||||
*
|
||||
* The `config` object contains the following properties:
|
||||
* - maxGroupSize {number}: The maximum number of indices allowed in a group.
|
||||
* - maxPrefixes {number}: The maximum number of prefix groups to return.
|
||||
* - minPrefixSize {number}: The minimum length of a prefix required to form a group. It avoid cases like returning
|
||||
* a single character prefix, e.g., ['.ds-...1', '.ds-....2', ....] -> returns a single group '.'
|
||||
*
|
||||
* @returns {Group[]} - An array of groups where each group contains a list of prefix parts and the count of indices that share that prefix.
|
||||
*
|
||||
* Example usage:
|
||||
*
|
||||
* ```typescript
|
||||
* const indices = ['apple', 'appetizer', 'application', 'banana', 'band', 'bandage'];
|
||||
* const config = {
|
||||
* maxGroupSize: 5,
|
||||
* maxPrefixes: 3,
|
||||
* minPrefixSize: 3
|
||||
* };
|
||||
*
|
||||
* const result = findCommonPrefixes(indices, config);
|
||||
* //result = [
|
||||
* // { parts: [ 'ban' ], indexCount: 3 },
|
||||
* // { parts: [ 'app' ], indexCount: 3 }
|
||||
* //]
|
||||
* ```
|
||||
*/
|
||||
|
||||
export function findCommonPrefixes(indices: string[], config: CommonPrefixesConfig): Group[] {
|
||||
const idCounter = function* (): Generator<number, number, number> {
|
||||
let id = 0;
|
||||
while (true) {
|
||||
yield id++;
|
||||
}
|
||||
};
|
||||
|
||||
const idGen = idCounter();
|
||||
|
||||
const root = newTrieNode('', '', idGen.next().value);
|
||||
for (const index of indices) {
|
||||
let node = root;
|
||||
node.count++;
|
||||
for (const char of index) {
|
||||
if (!node.children[char]) {
|
||||
node.children[char] = newTrieNode(char, node.prefix + char, idGen.next().value);
|
||||
}
|
||||
node = node.children[char];
|
||||
node.count++;
|
||||
}
|
||||
node.isEnd = true;
|
||||
}
|
||||
|
||||
const nodes = [root];
|
||||
const prefixes: Group[] = [];
|
||||
|
||||
while (nodes.length > 0) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||
const node = nodes.pop()!;
|
||||
if (
|
||||
(node.count <= config.maxGroupSize && node.prefix.length >= config.minPrefixSize) ||
|
||||
(Object.keys(node.children).length === 0 && node.prefix.length >= config.minPrefixSize)
|
||||
) {
|
||||
const group: Group = {
|
||||
parts: [node.prefix],
|
||||
indexCount: node.count,
|
||||
};
|
||||
prefixes.push(group);
|
||||
} else {
|
||||
for (const child of Object.values(node.children)) {
|
||||
nodes.push(child);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (prefixes.length > config.maxPrefixes) {
|
||||
prefixes.sort((a, b) => a.indexCount - b.indexCount);
|
||||
|
||||
while (prefixes.length > config.maxPrefixes) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||
const g1 = prefixes.shift()!;
|
||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||
const g2 = prefixes.shift()!;
|
||||
const mergedGroup: Group = {
|
||||
parts: g1.parts.concat(g2.parts),
|
||||
indexCount: g1.indexCount + g2.indexCount,
|
||||
};
|
||||
prefixes.push(mergedGroup);
|
||||
prefixes.sort((a, b) => a.indexCount - b.indexCount);
|
||||
}
|
||||
}
|
||||
|
||||
return prefixes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Splits an array of strings into chunks where the total length of strings in each chunk
|
||||
* does not exceed the specified `maxLength`.
|
||||
*
|
||||
* @param strings - An array of strings to be chunked.
|
||||
* @param maxLength - The maximum total length allowed for strings in each chunk. Defaults to 1024.
|
||||
* @returns A two-dimensional array where each inner array is a chunk of strings.
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const strings = ["hello", "world", "this", "is", "a", "test"];
|
||||
* const chunks = chunkStringsByMaxLength(strings, 10);
|
||||
* console.log(chunks);
|
||||
* // Output: [["hello", "world"], ["this", "is"], ["a", "test"]]
|
||||
* ```
|
||||
*/
|
||||
export function chunkStringsByMaxLength(strings: string[], maxLength: number = 3072): string[][] {
|
||||
// plus 1 for the comma separator
|
||||
return chunkedBy(strings, maxLength, (index) => index.length + 1);
|
||||
}
|
||||
|
|
|
@ -6,7 +6,11 @@
|
|||
*/
|
||||
|
||||
import os from 'os';
|
||||
import type { PaginationConfiguration, TelemetrySenderChannelConfiguration } from './types';
|
||||
import type {
|
||||
IndicesMetadataConfiguration,
|
||||
PaginationConfiguration,
|
||||
TelemetrySenderChannelConfiguration,
|
||||
} from './types';
|
||||
|
||||
class TelemetryConfigurationDTO {
|
||||
private readonly DEFAULT_TELEMETRY_MAX_BUFFER_SIZE = 100;
|
||||
|
@ -21,6 +25,15 @@ class TelemetryConfigurationDTO {
|
|||
max_page_size_bytes: Math.min(os.totalmem() * 0.02, 80 * 1024 * 1024),
|
||||
num_docs_to_sample: 10,
|
||||
};
|
||||
private readonly DEFAULT_INDICES_METADATA_CONFIG = {
|
||||
indices_threshold: 10000,
|
||||
datastreams_threshold: 1000,
|
||||
|
||||
max_prefixes: 10, // @deprecated
|
||||
max_group_size: 100, // @deprecated
|
||||
min_group_size: 5, // @deprecated
|
||||
};
|
||||
|
||||
private _telemetry_max_buffer_size = this.DEFAULT_TELEMETRY_MAX_BUFFER_SIZE;
|
||||
private _max_security_list_telemetry_batch = this.DEFAULT_MAX_SECURITY_LIST_TELEMETRY_BATCH;
|
||||
private _max_endpoint_telemetry_batch = this.DEFAULT_MAX_ENDPOINT_TELEMETRY_BATCH;
|
||||
|
@ -31,6 +44,8 @@ class TelemetryConfigurationDTO {
|
|||
[key: string]: TelemetrySenderChannelConfiguration;
|
||||
} = this.DEFAULT_SENDER_CHANNELS;
|
||||
private _pagination_config: PaginationConfiguration = this.DEFAULT_PAGINATION_CONFIG;
|
||||
private _indices_metadata_config: IndicesMetadataConfiguration =
|
||||
this.DEFAULT_INDICES_METADATA_CONFIG;
|
||||
|
||||
public get telemetry_max_buffer_size(): number {
|
||||
return this._telemetry_max_buffer_size;
|
||||
|
@ -96,6 +111,14 @@ class TelemetryConfigurationDTO {
|
|||
return this._pagination_config;
|
||||
}
|
||||
|
||||
public set indices_metadata_config(paginationConfiguration: IndicesMetadataConfiguration) {
|
||||
this._indices_metadata_config = paginationConfiguration;
|
||||
}
|
||||
|
||||
public get indices_metadata_config(): IndicesMetadataConfiguration {
|
||||
return this._indices_metadata_config;
|
||||
}
|
||||
|
||||
public resetAllToDefault() {
|
||||
this._telemetry_max_buffer_size = this.DEFAULT_TELEMETRY_MAX_BUFFER_SIZE;
|
||||
this._max_security_list_telemetry_batch = this.DEFAULT_MAX_SECURITY_LIST_TELEMETRY_BATCH;
|
||||
|
@ -104,6 +127,7 @@ class TelemetryConfigurationDTO {
|
|||
this._max_detection_alerts_batch = this.DEFAULT_MAX_DETECTION_ALERTS_BATCH;
|
||||
this._sender_channels = this.DEFAULT_SENDER_CHANNELS;
|
||||
this._pagination_config = this.DEFAULT_PAGINATION_CONFIG;
|
||||
this._indices_metadata_config = this.DEFAULT_INDICES_METADATA_CONFIG;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ import type {
|
|||
ResponseActionsApiCommandNames,
|
||||
} from '../../../../common/endpoint/service/response_actions/constants';
|
||||
import type { BulkUpsertAssetCriticalityRecordsResponse } from '../../../../common/api/entity_analytics';
|
||||
import type { DataStreams, IlmPolicies, IlmsStats, IndicesStats } from '../indices.metadata.types';
|
||||
|
||||
export const RISK_SCORE_EXECUTION_SUCCESS_EVENT: EventTypeOpts<{
|
||||
scoresWritten: number;
|
||||
|
@ -271,6 +272,241 @@ export const ALERT_SUPPRESSION_EVENT: EventTypeOpts<{
|
|||
},
|
||||
};
|
||||
|
||||
export const TELEMETRY_DATA_STREAM_EVENT: EventTypeOpts<DataStreams> = {
|
||||
eventType: 'telemetry_data_stream_event',
|
||||
schema: {
|
||||
items: {
|
||||
type: 'array',
|
||||
items: {
|
||||
properties: {
|
||||
datastream_name: {
|
||||
type: 'keyword',
|
||||
_meta: { description: 'Name of the data stream' },
|
||||
},
|
||||
indices: {
|
||||
type: 'array',
|
||||
items: {
|
||||
properties: {
|
||||
index_name: { type: 'date', _meta: { description: 'Index name' } },
|
||||
ilm_policy: { type: 'date', _meta: { optional: true, description: 'ILM policy' } },
|
||||
},
|
||||
},
|
||||
_meta: { optional: true, description: 'Indices associated with the data stream' },
|
||||
},
|
||||
},
|
||||
},
|
||||
_meta: { description: 'Datastreams' },
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
export const TELEMETRY_INDEX_STATS_EVENT: EventTypeOpts<IndicesStats> = {
|
||||
eventType: 'telemetry_index_stats_event',
|
||||
schema: {
|
||||
items: {
|
||||
type: 'array',
|
||||
items: {
|
||||
properties: {
|
||||
index_name: {
|
||||
type: 'keyword',
|
||||
_meta: { description: 'The name of the index being monitored.' },
|
||||
},
|
||||
query_total: {
|
||||
type: 'long',
|
||||
_meta: {
|
||||
optional: true,
|
||||
description: 'The total number of search queries executed on the index.',
|
||||
},
|
||||
},
|
||||
query_time_in_millis: {
|
||||
type: 'long',
|
||||
_meta: {
|
||||
optional: true,
|
||||
description:
|
||||
'The total time spent on query execution across all search requests, measured in milliseconds.',
|
||||
},
|
||||
},
|
||||
docs_count: {
|
||||
type: 'long',
|
||||
_meta: {
|
||||
optional: true,
|
||||
description: 'The total number of documents currently stored in the index.',
|
||||
},
|
||||
},
|
||||
docs_deleted: {
|
||||
type: 'long',
|
||||
_meta: {
|
||||
optional: true,
|
||||
description:
|
||||
'The total number of documents that have been marked as deleted in the index.',
|
||||
},
|
||||
},
|
||||
docs_total_size_in_bytes: {
|
||||
type: 'long',
|
||||
_meta: {
|
||||
optional: true,
|
||||
description:
|
||||
'The total size, in bytes, of all documents stored in the index, including storage overhead.',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
_meta: { description: 'Datastreams' },
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
export const TELEMETRY_ILM_POLICY_EVENT: EventTypeOpts<IlmPolicies> = {
|
||||
eventType: 'telemetry_ilm_policy_event',
|
||||
schema: {
|
||||
items: {
|
||||
type: 'array',
|
||||
items: {
|
||||
properties: {
|
||||
policy_name: {
|
||||
type: 'keyword',
|
||||
_meta: { description: 'The name of the ILM policy.' },
|
||||
},
|
||||
modified_date: {
|
||||
type: 'date',
|
||||
_meta: { description: 'The date when the ILM policy was last modified.' },
|
||||
},
|
||||
phases: {
|
||||
properties: {
|
||||
cold: {
|
||||
properties: {
|
||||
min_age: {
|
||||
type: 'text',
|
||||
_meta: {
|
||||
description:
|
||||
'The minimum age before the index transitions to the "cold" phase.',
|
||||
},
|
||||
},
|
||||
},
|
||||
_meta: {
|
||||
optional: true,
|
||||
description:
|
||||
'Configuration settings for the "cold" phase of the ILM policy, applied when data is infrequently accessed.',
|
||||
},
|
||||
},
|
||||
delete: {
|
||||
properties: {
|
||||
min_age: {
|
||||
type: 'text',
|
||||
_meta: {
|
||||
description:
|
||||
'The minimum age before the index transitions to the "delete" phase.',
|
||||
},
|
||||
},
|
||||
},
|
||||
_meta: {
|
||||
optional: true,
|
||||
description:
|
||||
'Configuration settings for the "delete" phase of the ILM policy, specifying when the index should be removed.',
|
||||
},
|
||||
},
|
||||
frozen: {
|
||||
properties: {
|
||||
min_age: {
|
||||
type: 'text',
|
||||
_meta: {
|
||||
description:
|
||||
'The minimum age before the index transitions to the "frozen" phase.',
|
||||
},
|
||||
},
|
||||
},
|
||||
_meta: {
|
||||
optional: true,
|
||||
description:
|
||||
'Configuration settings for the "frozen" phase of the ILM policy, where data is fully searchable but stored with a reduced resource footprint.',
|
||||
},
|
||||
},
|
||||
hot: {
|
||||
properties: {
|
||||
min_age: {
|
||||
type: 'text',
|
||||
_meta: {
|
||||
description:
|
||||
'The minimum age before the index transitions to the "hot" phase.',
|
||||
},
|
||||
},
|
||||
},
|
||||
_meta: {
|
||||
optional: true,
|
||||
description:
|
||||
'Configuration settings for the "hot" phase of the ILM policy, applied to actively written and queried data.',
|
||||
},
|
||||
},
|
||||
warm: {
|
||||
properties: {
|
||||
min_age: {
|
||||
type: 'text',
|
||||
_meta: {
|
||||
description:
|
||||
'The minimum age before the index transitions to the "warm" phase.',
|
||||
},
|
||||
},
|
||||
},
|
||||
_meta: {
|
||||
optional: true,
|
||||
description:
|
||||
'Configuration settings for the "warm" phase of the ILM policy, used for read-only data that is less frequently accessed.',
|
||||
},
|
||||
},
|
||||
},
|
||||
_meta: {
|
||||
description:
|
||||
'The different phases of the ILM policy that define how the index is managed over time.',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
_meta: { description: 'Datastreams' },
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
export const TELEMETRY_ILM_STATS_EVENT: EventTypeOpts<IlmsStats> = {
|
||||
eventType: 'telemetry_ilm_stats_event',
|
||||
schema: {
|
||||
items: {
|
||||
type: 'array',
|
||||
items: {
|
||||
properties: {
|
||||
index_name: {
|
||||
type: 'keyword',
|
||||
_meta: { description: 'The name of the index currently managed by the ILM policy.' },
|
||||
},
|
||||
phase: {
|
||||
type: 'keyword',
|
||||
_meta: {
|
||||
optional: true,
|
||||
description:
|
||||
'The current phase of the ILM policy that the index is in (e.g., hot, warm, cold, frozen, or delete).',
|
||||
},
|
||||
},
|
||||
age: {
|
||||
type: 'text',
|
||||
_meta: {
|
||||
optional: true,
|
||||
description:
|
||||
'The age of the index since its creation, indicating how long it has existed.',
|
||||
},
|
||||
},
|
||||
policy_name: {
|
||||
type: 'keyword',
|
||||
_meta: {
|
||||
optional: true,
|
||||
description: 'The name of the ILM policy applied to this index.',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
_meta: { description: 'Datastreams' },
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
interface CreateAssetCriticalityProcessedFileEvent {
|
||||
result?: BulkUpsertAssetCriticalityRecordsResponse['stats'];
|
||||
startTime: Date;
|
||||
|
@ -457,4 +693,8 @@ export const events = [
|
|||
ENTITY_ENGINE_RESOURCE_INIT_FAILURE_EVENT,
|
||||
ENTITY_ENGINE_INITIALIZATION_EVENT,
|
||||
ENTITY_STORE_USAGE_EVENT,
|
||||
TELEMETRY_DATA_STREAM_EVENT,
|
||||
TELEMETRY_ILM_POLICY_EVENT,
|
||||
TELEMETRY_ILM_STATS_EVENT,
|
||||
TELEMETRY_INDEX_STATS_EVENT,
|
||||
];
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { DateTime } from '@elastic/elasticsearch/lib/api/types';
|
||||
import type { Nullable } from './types';
|
||||
|
||||
export interface IlmPolicies {
|
||||
items: IlmPolicy[];
|
||||
}
|
||||
|
||||
export interface IlmPolicy {
|
||||
policy_name: string;
|
||||
modified_date: DateTime;
|
||||
phases: IlmPhases;
|
||||
}
|
||||
|
||||
export interface IlmPhases {
|
||||
cold: Nullable<IlmPhase>;
|
||||
delete: Nullable<IlmPhase>;
|
||||
frozen: Nullable<IlmPhase>;
|
||||
hot: Nullable<IlmPhase>;
|
||||
warm: Nullable<IlmPhase>;
|
||||
}
|
||||
|
||||
export interface IlmPhase {
|
||||
min_age: string;
|
||||
}
|
||||
|
||||
export interface IlmsStats {
|
||||
items: IlmStats[];
|
||||
}
|
||||
|
||||
export interface IlmStats {
|
||||
index_name: string;
|
||||
phase?: string;
|
||||
age?: string;
|
||||
policy_name?: string;
|
||||
}
|
||||
|
||||
export interface IndicesStats {
|
||||
items: IndexStats[];
|
||||
}
|
||||
|
||||
export interface IndexStats {
|
||||
index_name: string;
|
||||
query_total?: number;
|
||||
query_time_in_millis?: number;
|
||||
docs_count?: number;
|
||||
docs_deleted?: number;
|
||||
docs_total_size_in_bytes?: number;
|
||||
}
|
||||
|
||||
export interface Index {
|
||||
index_name: string;
|
||||
ilm_policy?: string;
|
||||
}
|
||||
|
||||
export interface DataStreams {
|
||||
items: DataStream[];
|
||||
}
|
||||
export interface DataStream {
|
||||
datastream_name: string;
|
||||
indices?: Index[];
|
||||
}
|
|
@ -7,7 +7,7 @@
|
|||
|
||||
import type { AxiosInstance, AxiosResponse } from 'axios';
|
||||
import axios, { AxiosHeaders } from 'axios';
|
||||
import type { Logger } from '@kbn/core/server';
|
||||
import type { EventTypeOpts, Logger } from '@kbn/core/server';
|
||||
import type { TelemetryPluginStart, TelemetryPluginSetup } from '@kbn/telemetry-plugin/server';
|
||||
import type { UsageCounter } from '@kbn/usage-collection-plugin/server';
|
||||
|
||||
|
@ -37,6 +37,9 @@ export class PreviewTelemetryEventsSender implements ITelemetryEventsSender {
|
|||
/** Last sent message */
|
||||
private sentMessages: string[] = [];
|
||||
|
||||
/** Last sent EBT events */
|
||||
private ebtEventsSent: Array<{ eventType: string; eventData: object }> = [];
|
||||
|
||||
/** Logger for this class */
|
||||
private logger: Logger;
|
||||
|
||||
|
@ -87,6 +90,10 @@ export class PreviewTelemetryEventsSender implements ITelemetryEventsSender {
|
|||
return this.sentMessages;
|
||||
}
|
||||
|
||||
public getEbtEventsSent(): Array<{ eventType: string; eventData: object }> {
|
||||
return this.ebtEventsSent;
|
||||
}
|
||||
|
||||
public setup(
|
||||
telemetryReceiver: ITelemetryReceiver,
|
||||
telemetrySetup?: TelemetryPluginSetup,
|
||||
|
@ -174,4 +181,12 @@ export class PreviewTelemetryEventsSender implements ITelemetryEventsSender {
|
|||
public updateDefaultQueueConfig(config: QueueConfig): void {
|
||||
this.composite.updateDefaultQueueConfig(config);
|
||||
}
|
||||
|
||||
public reportEBT<T>(eventTypeOpts: EventTypeOpts<T>, eventData: T): void {
|
||||
this.ebtEventsSent.push({
|
||||
eventType: eventTypeOpts.eventType,
|
||||
eventData: eventData as object,
|
||||
});
|
||||
this.composite.reportEBT(eventTypeOpts, eventData);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ import type {
|
|||
} from '@kbn/core/server';
|
||||
import type {
|
||||
AggregationsAggregate,
|
||||
IlmExplainLifecycleRequest,
|
||||
OpenPointInTimeResponse,
|
||||
SearchRequest,
|
||||
SearchResponse,
|
||||
|
@ -38,6 +39,10 @@ import type {
|
|||
SearchHit,
|
||||
SearchRequest as ESSearchRequest,
|
||||
SortResults,
|
||||
IndicesGetDataStreamRequest,
|
||||
IndicesStatsRequest,
|
||||
IlmGetLifecycleRequest,
|
||||
IndicesGetRequest,
|
||||
} from '@elastic/elasticsearch/lib/api/types';
|
||||
import type { TransportResult } from '@elastic/elasticsearch';
|
||||
import type { AgentPolicy, Installation } from '@kbn/fleet-plugin/common';
|
||||
|
@ -87,6 +92,16 @@ import { telemetryConfiguration } from './configuration';
|
|||
import { ENDPOINT_METRICS_INDEX } from '../../../common/constants';
|
||||
import { PREBUILT_RULES_PACKAGE_NAME } from '../../../common/detection_engine/constants';
|
||||
import type { TelemetryLogger } from './telemetry_logger';
|
||||
import type {
|
||||
DataStream,
|
||||
IlmPhase,
|
||||
IlmPhases,
|
||||
IlmPolicy,
|
||||
IlmStats,
|
||||
Index,
|
||||
IndexStats,
|
||||
} from './indices.metadata.types';
|
||||
import { chunkStringsByMaxLength } from './collections_helpers';
|
||||
|
||||
export interface ITelemetryReceiver {
|
||||
start(
|
||||
|
@ -238,6 +253,12 @@ export interface ITelemetryReceiver {
|
|||
setMaxPageSizeBytes(bytes: number): void;
|
||||
|
||||
setNumDocsToSample(n: number): void;
|
||||
|
||||
getIndices(): Promise<string[]>;
|
||||
getDataStreams(): Promise<DataStream[]>;
|
||||
getIndicesStats(indices: string[]): AsyncGenerator<IndexStats, void, unknown>;
|
||||
getIlmsStats(indices: string[]): AsyncGenerator<IlmStats, void, unknown>;
|
||||
getIlmsPolicies(ilms: string[]): AsyncGenerator<IlmPolicy, void, unknown>;
|
||||
}
|
||||
|
||||
export class TelemetryReceiver implements ITelemetryReceiver {
|
||||
|
@ -532,7 +553,6 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
const buckets = endpointMetadataResponse?.aggregations?.endpoint_metadata?.buckets ?? [];
|
||||
|
||||
return buckets.reduce((cache, endpointAgentId) => {
|
||||
// const id = endpointAgentId.latest_metadata.hits.hits[0]._id;
|
||||
const doc = endpointAgentId.latest_metadata.hits.hits[0]._source;
|
||||
cache.set(endpointAgentId.key, doc);
|
||||
return cache;
|
||||
|
@ -541,7 +561,7 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
}
|
||||
|
||||
public async *fetchDiagnosticAlertsBatch(executeFrom: string, executeTo: string) {
|
||||
this.logger.debug('Searching diagnostic alerts', {
|
||||
this.logger.l('Searching diagnostic alerts', {
|
||||
from: executeFrom,
|
||||
to: executeTo,
|
||||
} as LogMeta);
|
||||
|
@ -585,10 +605,10 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
fetchMore = false;
|
||||
}
|
||||
|
||||
this.logger.debug('Diagnostic alerts to return', { numOfHits } as LogMeta);
|
||||
this.logger.l('Diagnostic alerts to return', { numOfHits } as LogMeta);
|
||||
fetchMore = numOfHits > 0 && numOfHits < telemetryConfiguration.telemetry_max_buffer_size;
|
||||
} catch (e) {
|
||||
this.logger.l('Error fetching alerts', { error: JSON.stringify(e) });
|
||||
this.logger.warn('Error fetching alerts', { error_message: e.message } as LogMeta);
|
||||
fetchMore = false;
|
||||
}
|
||||
|
||||
|
@ -761,7 +781,7 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
executeFrom: string,
|
||||
executeTo: string
|
||||
) {
|
||||
this.logger.debug('Searching prebuilt rule alerts from', {
|
||||
this.logger.l('Searching prebuilt rule alerts from', {
|
||||
executeFrom,
|
||||
executeTo,
|
||||
} as LogMeta);
|
||||
|
@ -899,14 +919,14 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
pitId = response?.pit_id;
|
||||
}
|
||||
|
||||
this.logger.debug('Prebuilt rule alerts to return', { alerts: alerts.length } as LogMeta);
|
||||
this.logger.l('Prebuilt rule alerts to return', { alerts: alerts.length } as LogMeta);
|
||||
|
||||
yield alerts;
|
||||
}
|
||||
} catch (e) {
|
||||
// to keep backward compatibility with the previous implementation, silent return
|
||||
// once we start using `paginate` this error should be managed downstream
|
||||
this.logger.l('Error fetching alerts', { error: JSON.stringify(e) });
|
||||
this.logger.warn('Error fetching alerts', { error_message: e.message } as LogMeta);
|
||||
return;
|
||||
} finally {
|
||||
await this.closePointInTime(pitId);
|
||||
|
@ -930,10 +950,10 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
try {
|
||||
await this.esClient().closePointInTime({ id: pitId });
|
||||
} catch (error) {
|
||||
this.logger.l('Error trying to close point in time', {
|
||||
this.logger.warn('Error trying to close point in time', {
|
||||
pit: pitId,
|
||||
error: JSON.stringify(error),
|
||||
});
|
||||
error_message: error.message,
|
||||
} as LogMeta);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1019,7 +1039,7 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
|
||||
fetchMore = numOfHits > 0;
|
||||
} catch (e) {
|
||||
this.logger.l('Error fetching alerts', { error: JSON.stringify(e) });
|
||||
this.logger.warn('Error fetching alerts', { error_message: e.message } as LogMeta);
|
||||
fetchMore = false;
|
||||
}
|
||||
|
||||
|
@ -1034,11 +1054,11 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
try {
|
||||
await this.esClient().closePointInTime({ id: pitId });
|
||||
} catch (error) {
|
||||
this.logger.l('Error trying to close point in time', {
|
||||
this.logger.warn('Error trying to close point in time', {
|
||||
pit: pitId,
|
||||
error: JSON.stringify(error),
|
||||
error_message: error.message,
|
||||
keepAlive,
|
||||
});
|
||||
} as LogMeta);
|
||||
}
|
||||
|
||||
this.logger.l('Timeline alerts to return', { alerts: alertsToReturn.length });
|
||||
|
@ -1232,7 +1252,7 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
|
||||
return ret.license;
|
||||
} catch (err) {
|
||||
this.logger.l('failed retrieving license', { error: JSON.stringify(err) });
|
||||
this.logger.warn('failed retrieving license', { error_message: err.message } as LogMeta);
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
@ -1293,7 +1313,7 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
yield data;
|
||||
} while (esQuery.search_after !== undefined);
|
||||
} catch (e) {
|
||||
this.logger.l('Error running paginated query', { error: JSON.stringify(e) });
|
||||
this.logger.warn('Error running paginated query', { error_message: e.message } as LogMeta);
|
||||
throw e;
|
||||
} finally {
|
||||
await this.closePointInTime(pit.id);
|
||||
|
@ -1320,4 +1340,198 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
}
|
||||
return this._esClient;
|
||||
}
|
||||
|
||||
public async getIndices(): Promise<string[]> {
|
||||
const es = this.esClient();
|
||||
|
||||
this.logger.l('Fetching indices');
|
||||
|
||||
const request: IndicesGetRequest = {
|
||||
index: '*',
|
||||
expand_wildcards: ['open', 'hidden'],
|
||||
filter_path: ['*.settings.index.provided_name'],
|
||||
};
|
||||
|
||||
return es.indices
|
||||
.get(request)
|
||||
.then((indices) => Array.from(Object.keys(indices)))
|
||||
.catch((error) => {
|
||||
this.logger.warn('Error fetching indices', { error_message: error } as LogMeta);
|
||||
throw error;
|
||||
});
|
||||
}
|
||||
|
||||
public async getDataStreams(): Promise<DataStream[]> {
|
||||
const es = this.esClient();
|
||||
|
||||
this.logger.l('Fetching datstreams');
|
||||
|
||||
const request: IndicesGetDataStreamRequest = {
|
||||
name: '*',
|
||||
expand_wildcards: ['open', 'hidden'],
|
||||
filter_path: ['data_streams.name', 'data_streams.indices'],
|
||||
};
|
||||
|
||||
return es.indices
|
||||
.getDataStream(request)
|
||||
.then((response) =>
|
||||
response.data_streams.map((ds) => {
|
||||
return {
|
||||
datastream_name: ds.name,
|
||||
indices:
|
||||
ds.indices?.map((index) => {
|
||||
return {
|
||||
index_name: index.index_name,
|
||||
ilm_policy: index.ilm_policy,
|
||||
} as Index;
|
||||
}) ?? [],
|
||||
} as DataStream;
|
||||
})
|
||||
)
|
||||
.catch((error) => {
|
||||
this.logger.warn('Error fetching datastreams', { error_message: error } as LogMeta);
|
||||
throw error;
|
||||
});
|
||||
}
|
||||
|
||||
public async *getIndicesStats(indices: string[]) {
|
||||
const es = this.esClient();
|
||||
|
||||
this.logger.l('Fetching indices stats');
|
||||
|
||||
const groupedIndices = chunkStringsByMaxLength(indices);
|
||||
|
||||
this.logger.l('Splitted indices into groups', {
|
||||
groups: groupedIndices.length,
|
||||
indices: indices.length,
|
||||
} as LogMeta);
|
||||
|
||||
for (const group of groupedIndices) {
|
||||
const request: IndicesStatsRequest = {
|
||||
index: group,
|
||||
level: 'indices',
|
||||
metric: ['docs', 'search', 'store'],
|
||||
expand_wildcards: ['open', 'hidden'],
|
||||
filter_path: [
|
||||
'indices.*.total.search.query_total',
|
||||
'indices.*.total.search.query_time_in_millis',
|
||||
'indices.*.total.docs.count',
|
||||
'indices.*.total.docs.deleted',
|
||||
'indices.*.total.store.size_in_bytes',
|
||||
],
|
||||
};
|
||||
|
||||
try {
|
||||
const response = await es.indices.stats(request);
|
||||
for (const [indexName, stats] of Object.entries(response.indices ?? {})) {
|
||||
yield {
|
||||
index_name: indexName,
|
||||
query_total: stats.total?.search?.query_total,
|
||||
query_time_in_millis: stats.total?.search?.query_time_in_millis,
|
||||
docs_count: stats.total?.docs?.count,
|
||||
docs_deleted: stats.total?.docs?.deleted,
|
||||
docs_total_size_in_bytes: stats.total?.store?.size_in_bytes,
|
||||
} as IndexStats;
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.warn('Error fetching indices stats', { error_message: error } as LogMeta);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async *getIlmsStats(indices: string[]) {
|
||||
const es = this.esClient();
|
||||
|
||||
const groupedIndices = chunkStringsByMaxLength(indices);
|
||||
|
||||
this.logger.l('Splitted ilms into groups', {
|
||||
groups: groupedIndices.length,
|
||||
indices: indices.length,
|
||||
} as LogMeta);
|
||||
|
||||
for (const group of groupedIndices) {
|
||||
const request: IlmExplainLifecycleRequest = {
|
||||
index: group.join(','),
|
||||
only_managed: false,
|
||||
filter_path: ['indices.*.phase', 'indices.*.age', 'indices.*.policy'],
|
||||
};
|
||||
|
||||
const data = await es.ilm.explainLifecycle(request);
|
||||
|
||||
try {
|
||||
for (const [indexName, stats] of Object.entries(data.indices ?? {})) {
|
||||
const entry = {
|
||||
index_name: indexName,
|
||||
phase: ('phase' in stats && stats.phase) || undefined,
|
||||
age: ('age' in stats && stats.age) || undefined,
|
||||
policy_name: ('policy' in stats && stats.policy) || undefined,
|
||||
} as IlmStats;
|
||||
|
||||
yield entry;
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.warn('Error fetching ilm stats', { error_message: error } as LogMeta);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async *getIlmsPolicies(ilms: string[]) {
|
||||
const es = this.esClient();
|
||||
|
||||
const phase = (obj: unknown): Nullable<IlmPhase> => {
|
||||
let value: Nullable<IlmPhase>;
|
||||
if (obj !== null && obj !== undefined && typeof obj === 'object' && 'min_age' in obj) {
|
||||
value = {
|
||||
min_age: obj.min_age,
|
||||
} as IlmPhase;
|
||||
}
|
||||
return value;
|
||||
};
|
||||
|
||||
const groupedIlms = chunkStringsByMaxLength(ilms);
|
||||
|
||||
this.logger.l('Splitted ilms into groups', {
|
||||
groups: groupedIlms.length,
|
||||
ilms: ilms.length,
|
||||
} as LogMeta);
|
||||
|
||||
for (const group of groupedIlms) {
|
||||
this.logger.l('Fetching ilm policies');
|
||||
const request: IlmGetLifecycleRequest = {
|
||||
name: group.join(','),
|
||||
filter_path: [
|
||||
'*.policy.phases.cold.min_age',
|
||||
'*.policy.phases.delete.min_age',
|
||||
'*.policy.phases.frozen.min_age',
|
||||
'*.policy.phases.hot.min_age',
|
||||
'*.policy.phases.warm.min_age',
|
||||
'*.modified_date',
|
||||
],
|
||||
};
|
||||
|
||||
const response = await es.ilm.getLifecycle(request);
|
||||
try {
|
||||
for (const [policyName, stats] of Object.entries(response ?? {})) {
|
||||
yield {
|
||||
policy_name: policyName,
|
||||
modified_date: stats.modified_date,
|
||||
phases: {
|
||||
cold: phase(stats.policy.phases.cold),
|
||||
delete: phase(stats.policy.phases.delete),
|
||||
frozen: phase(stats.policy.phases.frozen),
|
||||
hot: phase(stats.policy.phases.hot),
|
||||
warm: phase(stats.policy.phases.warm),
|
||||
} as IlmPhases,
|
||||
} as IlmPolicy;
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.warn('Error fetching ilm policies', {
|
||||
error_message: error.message,
|
||||
} as LogMeta);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@ import { cloneDeep } from 'lodash';
|
|||
import { URL } from 'url';
|
||||
import { transformDataToNdjson } from '@kbn/securitysolution-utils';
|
||||
|
||||
import type { Logger, LogMeta } from '@kbn/core/server';
|
||||
import type { EventTypeOpts, Logger, LogMeta } from '@kbn/core/server';
|
||||
import type { TelemetryPluginStart, TelemetryPluginSetup } from '@kbn/telemetry-plugin/server';
|
||||
import type { UsageCounter } from '@kbn/usage-collection-plugin/server';
|
||||
import type { AxiosInstance } from 'axios';
|
||||
|
@ -88,6 +88,11 @@ export interface ITelemetryEventsSender {
|
|||
* Updates the default queue configuration.
|
||||
*/
|
||||
updateDefaultQueueConfig: (config: QueueConfig) => void;
|
||||
|
||||
/**
|
||||
* Reports EBT events
|
||||
*/
|
||||
reportEBT: <T>(eventTypeOpts: EventTypeOpts<T>, eventData: T) => void;
|
||||
}
|
||||
|
||||
export class TelemetryEventsSender implements ITelemetryEventsSender {
|
||||
|
@ -270,12 +275,16 @@ export class TelemetryEventsSender implements ITelemetryEventsSender {
|
|||
const telemetryUrl = await this.fetchTelemetryPingUrl();
|
||||
const resp = await axios.get(telemetryUrl, { timeout: 3000 });
|
||||
if (resp.status === 200) {
|
||||
this.logger.l('[Security Telemetry] elastic telemetry services are reachable');
|
||||
this.logger.debug('Elastic telemetry services are reachable');
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
} catch (_err) {
|
||||
} catch (e) {
|
||||
this.logger.warn('Error pinging telemetry services', {
|
||||
error: e.message,
|
||||
} as LogMeta);
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -426,6 +435,10 @@ export class TelemetryEventsSender implements ITelemetryEventsSender {
|
|||
this.getAsyncTelemetrySender().updateDefaultQueueConfig(config);
|
||||
}
|
||||
|
||||
public reportEBT<T>(eventTypeOpts: EventTypeOpts<T>, eventData: T): void {
|
||||
this.getAsyncTelemetrySender().reportEBT(eventTypeOpts, eventData);
|
||||
}
|
||||
|
||||
private getAsyncTelemetrySender(): IAsyncTelemetryEventsSender {
|
||||
if (!this.asyncTelemetrySender) {
|
||||
throw new Error('Telemetry Sender V2 not initialized');
|
||||
|
|
|
@ -16,6 +16,7 @@ import {
|
|||
createMockTaskMetrics,
|
||||
createMockSecurityTelemetryTask,
|
||||
} from './__mocks__';
|
||||
import { newTelemetryLogger } from './helpers';
|
||||
|
||||
describe('test security telemetry task', () => {
|
||||
let logger: ReturnType<typeof loggingSystemMock.createLogger>;
|
||||
|
@ -66,7 +67,7 @@ describe('test security telemetry task', () => {
|
|||
|
||||
expect(mockTelemetryTaskConfig.runTask).toHaveBeenCalledWith(
|
||||
telemetryTask.getTaskId(),
|
||||
logger,
|
||||
newTelemetryLogger(logger.get('task')),
|
||||
mockTelemetryReceiver,
|
||||
mockTelemetryEventsSender,
|
||||
mockTaskMetrics,
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
*/
|
||||
|
||||
import moment from 'moment';
|
||||
import type { Logger } from '@kbn/core/server';
|
||||
import type { Logger, LogMeta } from '@kbn/core/server';
|
||||
import type {
|
||||
ConcreteTaskInstance,
|
||||
TaskManagerSetupContract,
|
||||
|
@ -15,8 +15,9 @@ import type {
|
|||
import type { ITelemetryReceiver } from './receiver';
|
||||
import type { ITelemetryEventsSender } from './sender';
|
||||
import type { ITaskMetricsService } from './task_metrics.types';
|
||||
import { tlog } from './helpers';
|
||||
import { stateSchemaByVersion, emptyState, type LatestTaskStateSchema } from './task_state';
|
||||
import { newTelemetryLogger } from './helpers';
|
||||
import { type TelemetryLogger } from './telemetry_logger';
|
||||
|
||||
export interface SecurityTelemetryTaskConfig {
|
||||
type: string;
|
||||
|
@ -49,7 +50,7 @@ export type LastExecutionTimestampCalculator = (
|
|||
|
||||
export class SecurityTelemetryTask {
|
||||
private readonly config: SecurityTelemetryTaskConfig;
|
||||
private readonly logger: Logger;
|
||||
private readonly logger: TelemetryLogger;
|
||||
private readonly sender: ITelemetryEventsSender;
|
||||
private readonly receiver: ITelemetryReceiver;
|
||||
private readonly taskMetricsService: ITaskMetricsService;
|
||||
|
@ -62,7 +63,7 @@ export class SecurityTelemetryTask {
|
|||
taskMetricsService: ITaskMetricsService
|
||||
) {
|
||||
this.config = config;
|
||||
this.logger = logger;
|
||||
this.logger = newTelemetryLogger(logger.get('task'));
|
||||
this.sender = sender;
|
||||
this.receiver = receiver;
|
||||
this.taskMetricsService = taskMetricsService;
|
||||
|
@ -122,7 +123,7 @@ export class SecurityTelemetryTask {
|
|||
|
||||
public start = async (taskManager: TaskManagerStartContract) => {
|
||||
const taskId = this.getTaskId();
|
||||
tlog(this.logger, `[task ${taskId}]: attempting to schedule`);
|
||||
this.logger.debug('Attempting to schedule task', { taskId } as LogMeta);
|
||||
try {
|
||||
await taskManager.ensureScheduled({
|
||||
id: taskId,
|
||||
|
@ -135,30 +136,32 @@ export class SecurityTelemetryTask {
|
|||
params: { version: this.config.version },
|
||||
});
|
||||
} catch (e) {
|
||||
this.logger.error(`[task ${taskId}]: error scheduling task, received ${e.message}`);
|
||||
this.logger.error('Error scheduling task', {
|
||||
error: e.message,
|
||||
} as LogMeta);
|
||||
}
|
||||
};
|
||||
|
||||
public runTask = async (taskId: string, executionPeriod: TaskExecutionPeriod) => {
|
||||
tlog(this.logger, `[task ${taskId}]: attempting to run`);
|
||||
this.logger.debug('Attempting to run', { taskId } as LogMeta);
|
||||
if (taskId !== this.getTaskId()) {
|
||||
tlog(this.logger, `[task ${taskId}]: outdated task`);
|
||||
this.logger.info('outdated task', { taskId } as LogMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
const isOptedIn = await this.sender.isTelemetryOptedIn();
|
||||
if (!isOptedIn) {
|
||||
tlog(this.logger, `[task ${taskId}]: telemetry is not opted-in`);
|
||||
this.logger.info('Telemetry is not opted-in', { taskId } as LogMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
const isTelemetryServicesReachable = await this.sender.isTelemetryServicesReachable();
|
||||
if (!isTelemetryServicesReachable) {
|
||||
tlog(this.logger, `[task ${taskId}]: cannot reach telemetry services`);
|
||||
this.logger.info('Cannot reach telemetry services', { taskId } as LogMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
tlog(this.logger, `[task ${taskId}]: running task`);
|
||||
this.logger.debug('Running task', { taskId } as LogMeta);
|
||||
return this.config.runTask(
|
||||
taskId,
|
||||
this.logger,
|
||||
|
|
|
@ -29,7 +29,11 @@ export class TaskMetricsService implements ITaskMetricsService {
|
|||
public async end(trace: Trace, error?: Error): Promise<void> {
|
||||
const event = this.createTaskMetric(trace, error);
|
||||
|
||||
this.logger.l(`Task ${event.name} complete. Task run took ${event.time_executed_in_ms}ms`);
|
||||
this.logger.l('Task completed', {
|
||||
task_name: event.name,
|
||||
time_executed_in_ms: event.time_executed_in_ms,
|
||||
error_message: event.error_message,
|
||||
});
|
||||
|
||||
if (telemetryConfiguration.use_async_sender) {
|
||||
this.sender.sendAsync(TelemetryChannel.TASK_METRICS, [event]);
|
||||
|
|
|
@ -51,7 +51,7 @@ export function createTelemetryConfigurationTaskConfig() {
|
|||
const configArtifact = manifest.data as unknown as TelemetryConfiguration;
|
||||
|
||||
log.l('Got telemetry configuration artifact', {
|
||||
artifact: configArtifact,
|
||||
artifact: configArtifact ?? '<null>',
|
||||
});
|
||||
|
||||
telemetryConfiguration.max_detection_alerts_batch =
|
||||
|
@ -107,6 +107,11 @@ export function createTelemetryConfigurationTaskConfig() {
|
|||
_receiver.setNumDocsToSample(configArtifact.pagination_config.num_docs_to_sample);
|
||||
}
|
||||
|
||||
if (configArtifact.indices_metadata_config) {
|
||||
log.l('Updating indices metadata configuration');
|
||||
telemetryConfiguration.indices_metadata_config = configArtifact.indices_metadata_config;
|
||||
}
|
||||
|
||||
await taskMetricsService.end(trace);
|
||||
|
||||
log.l('Updated TelemetryConfiguration', { configuration: telemetryConfiguration });
|
||||
|
|
|
@ -16,6 +16,7 @@ import { createTelemetryDiagnosticTimelineTaskConfig } from './timelines_diagnos
|
|||
import { createTelemetryConfigurationTaskConfig } from './configuration';
|
||||
import { telemetryConfiguration } from '../configuration';
|
||||
import { createTelemetryFilterListArtifactTaskConfig } from './filterlists';
|
||||
import { createTelemetryIndicesMetadataTaskConfig } from './indices.metadata';
|
||||
|
||||
export function createTelemetryTaskConfigs(): SecurityTelemetryTaskConfig[] {
|
||||
return [
|
||||
|
@ -30,5 +31,6 @@ export function createTelemetryTaskConfigs(): SecurityTelemetryTaskConfig[] {
|
|||
createTelemetryDiagnosticTimelineTaskConfig(),
|
||||
createTelemetryConfigurationTaskConfig(),
|
||||
createTelemetryFilterListArtifactTaskConfig(),
|
||||
createTelemetryIndicesMetadataTaskConfig(),
|
||||
];
|
||||
}
|
||||
|
|
|
@ -0,0 +1,192 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { LogMeta, Logger } from '@kbn/core/server';
|
||||
import type { ITelemetryEventsSender } from '../sender';
|
||||
import type { ITelemetryReceiver } from '../receiver';
|
||||
import type { TaskExecutionPeriod } from '../task';
|
||||
import type { ITaskMetricsService } from '../task_metrics.types';
|
||||
import {
|
||||
createUsageCounterLabel,
|
||||
getPreviousDailyTaskTimestamp,
|
||||
newTelemetryLogger,
|
||||
} from '../helpers';
|
||||
import {
|
||||
TELEMETRY_DATA_STREAM_EVENT,
|
||||
TELEMETRY_ILM_POLICY_EVENT,
|
||||
TELEMETRY_ILM_STATS_EVENT,
|
||||
TELEMETRY_INDEX_STATS_EVENT,
|
||||
} from '../event_based/events';
|
||||
import { telemetryConfiguration } from '../configuration';
|
||||
import type {
|
||||
DataStream,
|
||||
DataStreams,
|
||||
IlmPolicies,
|
||||
IlmsStats,
|
||||
IndicesStats,
|
||||
} from '../indices.metadata.types';
|
||||
import { TelemetryCounter } from '../types';
|
||||
|
||||
const COUNTER_LABELS = ['security_solution', 'indices-metadata'];
|
||||
|
||||
export function createTelemetryIndicesMetadataTaskConfig() {
|
||||
const taskType = 'security:indices-metadata-telemetry';
|
||||
return {
|
||||
type: taskType,
|
||||
title: 'Security Solution Telemetry Indices Metadata task',
|
||||
interval: '24h',
|
||||
timeout: '1m',
|
||||
version: '1.0.0',
|
||||
getLastExecutionTime: getPreviousDailyTaskTimestamp,
|
||||
runTask: async (
|
||||
taskId: string,
|
||||
logger: Logger,
|
||||
receiver: ITelemetryReceiver,
|
||||
sender: ITelemetryEventsSender,
|
||||
taskMetricsService: ITaskMetricsService,
|
||||
taskExecutionPeriod: TaskExecutionPeriod
|
||||
) => {
|
||||
const mdc = { task_id: taskId, task_execution_period: taskExecutionPeriod };
|
||||
const log = newTelemetryLogger(logger.get('indices-metadata'), mdc);
|
||||
const trace = taskMetricsService.start(taskType);
|
||||
|
||||
const taskConfig = telemetryConfiguration.indices_metadata_config;
|
||||
|
||||
const publishDatastreamsStats = (stats: DataStream[]): number => {
|
||||
const events: DataStreams = {
|
||||
items: stats,
|
||||
};
|
||||
sender.reportEBT(TELEMETRY_DATA_STREAM_EVENT, events);
|
||||
log.info(`Sent data streams`, { count: events.items.length } as LogMeta);
|
||||
return events.items.length;
|
||||
};
|
||||
|
||||
const publishIndicesStats = async (indices: string[]): Promise<number> => {
|
||||
const indicesStats: IndicesStats = {
|
||||
items: [],
|
||||
};
|
||||
|
||||
for await (const stat of receiver.getIndicesStats(indices)) {
|
||||
indicesStats.items.push(stat);
|
||||
}
|
||||
sender.reportEBT(TELEMETRY_INDEX_STATS_EVENT, indicesStats);
|
||||
log.info(`Sent indices stats`, { count: indicesStats.items.length } as LogMeta);
|
||||
return indicesStats.items.length;
|
||||
};
|
||||
|
||||
const publishIlmStats = async (indices: string[]): Promise<Set<string>> => {
|
||||
const ilmNames = new Set<string>();
|
||||
const ilmsStats: IlmsStats = {
|
||||
items: [],
|
||||
};
|
||||
|
||||
for await (const stat of receiver.getIlmsStats(indices)) {
|
||||
if (stat.policy_name !== undefined) {
|
||||
ilmNames.add(stat.policy_name);
|
||||
ilmsStats.items.push(stat);
|
||||
}
|
||||
}
|
||||
|
||||
sender.reportEBT(TELEMETRY_ILM_STATS_EVENT, ilmsStats);
|
||||
log.info(`Sent ILM stats`, { count: ilmNames.size } as LogMeta);
|
||||
|
||||
return ilmNames;
|
||||
};
|
||||
|
||||
const publishIlmPolicies = async (ilmNames: Set<string>): Promise<number> => {
|
||||
const ilmPolicies: IlmPolicies = {
|
||||
items: [],
|
||||
};
|
||||
|
||||
for await (const policy of receiver.getIlmsPolicies(Array.from(ilmNames.values()))) {
|
||||
ilmPolicies.items.push(policy);
|
||||
}
|
||||
sender.reportEBT(TELEMETRY_ILM_POLICY_EVENT, ilmPolicies);
|
||||
log.info('Sent ILM policies', { count: ilmPolicies.items.length } as LogMeta);
|
||||
return ilmPolicies.items.length;
|
||||
};
|
||||
|
||||
const incrementCounter = (type: TelemetryCounter, name: string, value: number) => {
|
||||
const telemetryUsageCounter = sender.getTelemetryUsageCluster();
|
||||
telemetryUsageCounter?.incrementCounter({
|
||||
counterName: createUsageCounterLabel(COUNTER_LABELS.concat(name)),
|
||||
counterType: type,
|
||||
incrementBy: value,
|
||||
});
|
||||
};
|
||||
|
||||
try {
|
||||
// 1. Get cluster stats and list of indices and datastreams
|
||||
const [indices, dataStreams] = await Promise.all([
|
||||
receiver.getIndices(),
|
||||
receiver.getDataStreams(),
|
||||
]);
|
||||
|
||||
// 2. Publish datastreams stats
|
||||
const dsCount = publishDatastreamsStats(
|
||||
dataStreams.slice(0, taskConfig.datastreams_threshold)
|
||||
);
|
||||
incrementCounter(TelemetryCounter.DOCS_SENT, 'datastreams-stats', dsCount);
|
||||
|
||||
// 3. Get and publish indices stats
|
||||
const indicesCount: number = await publishIndicesStats(
|
||||
indices.slice(0, taskConfig.indices_threshold)
|
||||
)
|
||||
.then((count) => {
|
||||
incrementCounter(TelemetryCounter.DOCS_SENT, 'indices-stats', count);
|
||||
return count;
|
||||
})
|
||||
.catch((err) => {
|
||||
log.warn(`Error getting indices stats`, { error: err.message } as LogMeta);
|
||||
incrementCounter(TelemetryCounter.RUNTIME_ERROR, 'indices-stats', 1);
|
||||
return 0;
|
||||
});
|
||||
|
||||
// 4. Get ILM stats and publish them
|
||||
const ilmNames = await publishIlmStats(indices.slice(0, taskConfig.indices_threshold))
|
||||
.then((names) => {
|
||||
incrementCounter(TelemetryCounter.DOCS_SENT, 'ilm-stats', names.size);
|
||||
return names;
|
||||
})
|
||||
.catch((err) => {
|
||||
log.warn(`Error getting ILM stats`, { error: err.message } as LogMeta);
|
||||
incrementCounter(TelemetryCounter.RUNTIME_ERROR, 'ilm-stats', 1);
|
||||
return new Set<string>();
|
||||
});
|
||||
|
||||
// 5. Publish ILM policies
|
||||
const policyCount = await publishIlmPolicies(ilmNames)
|
||||
.then((count) => {
|
||||
incrementCounter(TelemetryCounter.DOCS_SENT, 'ilm-policies', count);
|
||||
return count;
|
||||
})
|
||||
.catch((err) => {
|
||||
log.warn(`Error getting ILM policies`, { error: err.message } as LogMeta);
|
||||
incrementCounter(TelemetryCounter.RUNTIME_ERROR, 'ilm-policies', 1);
|
||||
return 0;
|
||||
});
|
||||
|
||||
log.info(`Sent EBT events`, {
|
||||
datastreams: dsCount,
|
||||
ilms: ilmNames.size,
|
||||
indices: indicesCount,
|
||||
policies: policyCount,
|
||||
} as LogMeta);
|
||||
|
||||
await taskMetricsService.end(trace);
|
||||
|
||||
return indicesCount;
|
||||
} catch (err) {
|
||||
log.warn(`Error running indices metadata task`, {
|
||||
error: err.message,
|
||||
} as LogMeta);
|
||||
await taskMetricsService.end(trace, err);
|
||||
return 0;
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
|
@ -465,6 +465,15 @@ export interface TelemetryConfiguration {
|
|||
[key: string]: TelemetrySenderChannelConfiguration;
|
||||
};
|
||||
pagination_config?: PaginationConfiguration;
|
||||
indices_metadata_config?: IndicesMetadataConfiguration;
|
||||
}
|
||||
|
||||
export interface IndicesMetadataConfiguration {
|
||||
indices_threshold: number;
|
||||
datastreams_threshold: number;
|
||||
max_prefixes: number;
|
||||
max_group_size: number;
|
||||
min_group_size: number;
|
||||
}
|
||||
|
||||
export interface PaginationConfiguration {
|
||||
|
|
|
@ -507,7 +507,8 @@ export class Plugin implements ISecuritySolutionPlugin {
|
|||
DEFAULT_QUEUE_CONFIG,
|
||||
this.telemetryReceiver,
|
||||
plugins.telemetry,
|
||||
this.telemetryUsageCounter
|
||||
this.telemetryUsageCounter,
|
||||
core.analytics
|
||||
);
|
||||
|
||||
this.telemetryEventsSender.setup(
|
||||
|
|
|
@ -5,9 +5,10 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
export * from './rules';
|
||||
export * from './alerts';
|
||||
export * from './delete_all_anomalies';
|
||||
export * from './count_down_test';
|
||||
export * from './delete_all_anomalies';
|
||||
export * from './route_with_namespace';
|
||||
export * from './rules';
|
||||
export * from './tasks';
|
||||
export * from './wait_for';
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
export * from './indices_metadata';
|
||||
export * from './task_manager';
|
|
@ -0,0 +1,108 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { Client } from '@elastic/elasticsearch';
|
||||
|
||||
const DS_PREFIX = 'testing-datastream';
|
||||
const ILM_PREFIX = 'testing-ilm';
|
||||
|
||||
export const randomDatastream = async (es: Client, policyName?: string): Promise<string> => {
|
||||
const name = `${DS_PREFIX}-${Date.now()}`;
|
||||
|
||||
let settings = {};
|
||||
|
||||
if (policyName) {
|
||||
settings = {
|
||||
...settings,
|
||||
'index.lifecycle.name': policyName,
|
||||
};
|
||||
}
|
||||
|
||||
const indexTemplateBody = {
|
||||
index_patterns: [`${DS_PREFIX}-*`],
|
||||
data_stream: {},
|
||||
template: {
|
||||
settings,
|
||||
},
|
||||
};
|
||||
|
||||
await es.indices.putIndexTemplate({
|
||||
name: DS_PREFIX,
|
||||
body: indexTemplateBody,
|
||||
});
|
||||
|
||||
await es.indices.createDataStream({ name });
|
||||
|
||||
return name;
|
||||
};
|
||||
|
||||
export const randomIlmPolicy = async (es: Client): Promise<string> => {
|
||||
const name = `${ILM_PREFIX}-${Date.now()}`;
|
||||
|
||||
const policy = {
|
||||
phases: {
|
||||
hot: {
|
||||
actions: {
|
||||
rollover: {
|
||||
max_size: '50gb',
|
||||
max_age: '30d',
|
||||
},
|
||||
},
|
||||
},
|
||||
warm: {
|
||||
min_age: '30d',
|
||||
actions: {
|
||||
forcemerge: {
|
||||
max_num_segments: 1,
|
||||
},
|
||||
shrink: {
|
||||
number_of_shards: 1,
|
||||
},
|
||||
allocate: {
|
||||
number_of_replicas: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
delete: {
|
||||
min_age: '90d',
|
||||
actions: {
|
||||
delete: {},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
await es.ilm.putLifecycle({ name, policy });
|
||||
|
||||
return name;
|
||||
};
|
||||
|
||||
export const ensureBackingIndices = async (dsName: string, count: number, es: Client) => {
|
||||
const stats = await es.indices.dataStreamsStats({ name: dsName });
|
||||
if (stats.data_streams.length !== 1) {
|
||||
throw new Error('Data stream not found');
|
||||
}
|
||||
const current = stats.data_streams[0].backing_indices;
|
||||
|
||||
if (current < count) {
|
||||
for (let i = current; i < count; i++) {
|
||||
await es.indices.rollover({ alias: dsName });
|
||||
}
|
||||
} else if (current > count) {
|
||||
throw new Error('Cannot reduce the number of backing indices');
|
||||
}
|
||||
};
|
||||
|
||||
export const cleanupDatastreams = async (es: Client) => {
|
||||
await es.indices.deleteDataStream({ name: `${DS_PREFIX}*` });
|
||||
};
|
||||
|
||||
export const cleanupPolicies = async (es: Client) => {
|
||||
const policies = await es.ilm.getLifecycle({ name: `${ILM_PREFIX}*` });
|
||||
|
||||
await Promise.all(Object.entries(policies).map(([name, _]) => es.ilm.deleteLifecycle({ name })));
|
||||
};
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { TaskStatus } from '@kbn/task-manager-plugin/server';
|
||||
import { KbnClient } from '@kbn/test';
|
||||
import { ToolingLog } from '@kbn/tooling-log';
|
||||
|
||||
export const taskHasRun = async (taskId: string, kbn: KbnClient, after: Date): Promise<boolean> => {
|
||||
const task = await kbn.savedObjects.get({
|
||||
type: 'task',
|
||||
id: taskId,
|
||||
});
|
||||
|
||||
const runAt = new Date(task.attributes.runAt);
|
||||
const status = task.attributes.status;
|
||||
|
||||
return runAt > after && status === TaskStatus.Idle;
|
||||
};
|
||||
|
||||
export const launchTask = async (
|
||||
taskId: string,
|
||||
kbn: KbnClient,
|
||||
logger: ToolingLog,
|
||||
delayMillis: number = 1_000
|
||||
): Promise<Date> => {
|
||||
logger.info(`Launching task ${taskId}`);
|
||||
const task = await kbn.savedObjects.get({
|
||||
type: 'task',
|
||||
id: taskId,
|
||||
});
|
||||
|
||||
const runAt = new Date(Date.now() + delayMillis).toISOString();
|
||||
|
||||
await kbn.savedObjects.update({
|
||||
type: 'task',
|
||||
id: taskId,
|
||||
attributes: {
|
||||
...task.attributes,
|
||||
runAt,
|
||||
scheduledAt: runAt,
|
||||
status: TaskStatus.Idle,
|
||||
},
|
||||
});
|
||||
|
||||
logger.info(`Task ${taskId} launched`);
|
||||
|
||||
return new Date(runAt);
|
||||
};
|
|
@ -164,6 +164,7 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
'security-solution-ea-asset-criticality-ecs-migration',
|
||||
'security:endpoint-diagnostics',
|
||||
'security:endpoint-meta-telemetry',
|
||||
'security:indices-metadata-telemetry',
|
||||
'security:telemetry-configuration',
|
||||
'security:telemetry-detection-rules',
|
||||
'security:telemetry-diagnostic-timelines',
|
||||
|
|
|
@ -5,6 +5,8 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import path from 'path';
|
||||
|
||||
import { CA_CERT_PATH } from '@kbn/dev-utils';
|
||||
import { FtrConfigProviderContext, kbnTestConfig, kibanaTestUser } from '@kbn/test';
|
||||
import { services as baseServices } from './services';
|
||||
|
@ -86,6 +88,11 @@ export function createTestConfig(options: CreateTestConfigOptions, testFiles?: s
|
|||
'riskScoringRoutesEnabled',
|
||||
'alertSuppressionForSequenceEqlRuleEnabled',
|
||||
])}`,
|
||||
`--plugin-path=${path.resolve(
|
||||
__dirname,
|
||||
'../../../../../test/analytics/plugins/analytics_ftr_helpers'
|
||||
)}`,
|
||||
|
||||
'--xpack.task_manager.poll_interval=1000',
|
||||
`--xpack.actions.preconfigured=${JSON.stringify(PRECONFIGURED_ACTION_CONNECTORS)}`,
|
||||
...(ssl
|
||||
|
|
|
@ -5,12 +5,14 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import { KibanaEBTServerProvider } from '@kbn/test-suites-src/analytics/services/kibana_ebt';
|
||||
import { SecuritySolutionESSUtils } from '../services/security_solution_ess_utils';
|
||||
import { SpacesServiceProvider } from '../../../common/services/spaces';
|
||||
import { services as essServices } from '../../../api_integration/services';
|
||||
import { SecuritySolutionESSUtils } from '../services/security_solution_ess_utils';
|
||||
|
||||
export const services = {
|
||||
...essServices,
|
||||
spaces: SpacesServiceProvider,
|
||||
securitySolutionUtils: SecuritySolutionESSUtils,
|
||||
kibana_ebt_server: KibanaEBTServerProvider,
|
||||
};
|
||||
|
|
|
@ -4,6 +4,8 @@
|
|||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
import path from 'path';
|
||||
|
||||
import { FtrConfigProviderContext } from '@kbn/test';
|
||||
import { services } from './services';
|
||||
import { PRECONFIGURED_ACTION_CONNECTORS } from '../shared';
|
||||
|
@ -13,6 +15,7 @@ export interface CreateTestConfigOptions {
|
|||
junit: { reportName: string };
|
||||
kbnTestServerArgs?: string[];
|
||||
kbnTestServerEnv?: Record<string, string>;
|
||||
suiteTags?: { include?: string[]; exclude?: string[] };
|
||||
}
|
||||
|
||||
export function createTestConfig(options: CreateTestConfigOptions) {
|
||||
|
@ -22,6 +25,7 @@ export function createTestConfig(options: CreateTestConfigOptions) {
|
|||
);
|
||||
return {
|
||||
...svlSharedConfig.getAll(),
|
||||
suiteTags: options.suiteTags,
|
||||
services: {
|
||||
...services,
|
||||
},
|
||||
|
@ -32,6 +36,10 @@ export function createTestConfig(options: CreateTestConfigOptions) {
|
|||
'--serverless=security',
|
||||
`--xpack.actions.preconfigured=${JSON.stringify(PRECONFIGURED_ACTION_CONNECTORS)}`,
|
||||
...(options.kbnTestServerArgs || []),
|
||||
`--plugin-path=${path.resolve(
|
||||
__dirname,
|
||||
'../../../../../test/analytics/plugins/analytics_ftr_helpers'
|
||||
)}`,
|
||||
],
|
||||
env: {
|
||||
...svlSharedConfig.get('kbnTestServer.env'),
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
import { SearchSecureService } from '@kbn/test-suites-serverless/shared/services/search_secure';
|
||||
import { services as serverlessServices } from '@kbn/test-suites-serverless/api_integration/services';
|
||||
import { KibanaEBTServerProvider } from '@kbn/test-suites-src/analytics/services/kibana_ebt';
|
||||
import { SpacesServiceProvider } from '../../../common/services/spaces';
|
||||
import { SecuritySolutionServerlessUtils } from '../services/security_solution_serverless_utils';
|
||||
import { SecuritySolutionServerlessSuperTest } from '../services/security_solution_serverless_supertest';
|
||||
|
@ -17,4 +18,5 @@ export const services = {
|
|||
secureSearch: SearchSecureService,
|
||||
securitySolutionUtils: SecuritySolutionServerlessUtils,
|
||||
supertest: SecuritySolutionServerlessSuperTest,
|
||||
kibana_ebt_server: KibanaEBTServerProvider,
|
||||
};
|
||||
|
|
|
@ -47,40 +47,70 @@ export default ({ getService }: FtrProviderContext) => {
|
|||
await retry.try(async () => {
|
||||
const stats = await getSecurityTelemetryStats(supertest, log);
|
||||
removeExtraFieldsFromTelemetryStats(stats);
|
||||
expect(stats).to.eql({
|
||||
detection_rules: [
|
||||
[
|
||||
{
|
||||
name: 'security:telemetry-detection-rules',
|
||||
passed: true,
|
||||
},
|
||||
],
|
||||
|
||||
expect(stats.detection_rules).to.eql([
|
||||
[
|
||||
{
|
||||
name: 'security:telemetry-detection-rules',
|
||||
passed: true,
|
||||
},
|
||||
],
|
||||
security_lists: [
|
||||
[
|
||||
{
|
||||
name: 'security:telemetry-lists',
|
||||
passed: true,
|
||||
},
|
||||
],
|
||||
]);
|
||||
|
||||
expect(stats.security_lists).to.eql([
|
||||
[
|
||||
{
|
||||
name: 'security:telemetry-lists',
|
||||
passed: true,
|
||||
},
|
||||
],
|
||||
endpoints: [
|
||||
[
|
||||
{
|
||||
name: 'security:endpoint-meta-telemetry',
|
||||
passed: true,
|
||||
},
|
||||
],
|
||||
]);
|
||||
|
||||
expect(stats.endpoints).to.eql([
|
||||
[
|
||||
{
|
||||
name: 'security:endpoint-meta-telemetry',
|
||||
passed: true,
|
||||
},
|
||||
],
|
||||
diagnostics: [
|
||||
[
|
||||
{
|
||||
name: 'security:endpoint-diagnostics',
|
||||
passed: true,
|
||||
},
|
||||
],
|
||||
]);
|
||||
|
||||
expect(stats.diagnostics).to.eql([
|
||||
[
|
||||
{
|
||||
name: 'security:endpoint-diagnostics',
|
||||
passed: true,
|
||||
},
|
||||
],
|
||||
});
|
||||
]);
|
||||
|
||||
expect(stats.indices_metadata).to.be.an('array');
|
||||
const events = stats.indices_metadata as any[];
|
||||
|
||||
expect(events).to.not.be.empty();
|
||||
|
||||
const eventTypes = events.map((e) => e.eventType);
|
||||
expect(eventTypes).to.contain('telemetry_index_stats_event');
|
||||
expect(eventTypes).to.contain('telemetry_data_stream_event');
|
||||
|
||||
const indicesStats = events.find((e) => e.eventType === 'telemetry_index_stats_event');
|
||||
expect(indicesStats).to.be.ok();
|
||||
expect(indicesStats.eventData).to.be.ok();
|
||||
expect(indicesStats.eventData.items).to.not.be.empty();
|
||||
expect(indicesStats.eventData.items[0]).to.have.keys(
|
||||
'index_name',
|
||||
'query_total',
|
||||
'query_time_in_millis',
|
||||
'docs_count',
|
||||
'docs_deleted',
|
||||
'docs_total_size_in_bytes'
|
||||
);
|
||||
|
||||
const dataStreamStats = events.find((e) => e.eventType === 'telemetry_data_stream_event');
|
||||
expect(dataStreamStats).to.be.ok();
|
||||
expect(dataStreamStats.eventData).to.be.ok();
|
||||
expect(dataStreamStats.eventData.items).to.not.be.empty();
|
||||
expect(dataStreamStats.eventData.items[0]).to.have.keys('datastream_name', 'indices');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -5,19 +5,29 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import { unset } from 'lodash';
|
||||
|
||||
export const removeExtraFieldsFromTelemetryStats = (stats: any) => {
|
||||
Object.entries(stats).forEach(([, value]: [unknown, any]) => {
|
||||
value.forEach((entry: any, i: number) => {
|
||||
entry.forEach((_e: any, j: number) => {
|
||||
unset(value, `[${i}][${j}].time_executed_in_ms`);
|
||||
unset(value, `[${i}][${j}].start_time`);
|
||||
unset(value, `[${i}][${j}].end_time`);
|
||||
unset(value, `[${i}][${j}].cluster_uuid`);
|
||||
unset(value, `[${i}][${j}].cluster_name`);
|
||||
unset(value, `[${i}][${j}].license`);
|
||||
});
|
||||
});
|
||||
});
|
||||
removeExtraFields(stats, [
|
||||
'time_executed_in_ms',
|
||||
'start_time',
|
||||
'end_time',
|
||||
'cluster_uuid',
|
||||
'cluster_name',
|
||||
'license',
|
||||
]);
|
||||
};
|
||||
|
||||
function removeExtraFields(obj: any, fields: string[]): void {
|
||||
function traverseAndRemove(o: any): void {
|
||||
if (typeof o !== 'object' || o === null) return;
|
||||
|
||||
for (const key in o) {
|
||||
if (fields.includes(key)) {
|
||||
delete o[key];
|
||||
} else if (typeof o[key] === 'object') {
|
||||
traverseAndRemove(o[key]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
traverseAndRemove(obj);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { FtrConfigProviderContext } from '@kbn/test';
|
||||
|
||||
export default async function ({ readConfigFile }: FtrConfigProviderContext) {
|
||||
const functionalConfig = await readConfigFile(
|
||||
require.resolve('../../../config/ess/config.base.basic')
|
||||
);
|
||||
|
||||
return {
|
||||
...functionalConfig.getAll(),
|
||||
uiSettings: {},
|
||||
testFiles: [require.resolve('..')],
|
||||
junit: {
|
||||
reportName: 'Security Solution - Telemetry Integration Tests - ESS Env',
|
||||
},
|
||||
};
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { createTestConfig } from '../../../config/serverless/config.base';
|
||||
|
||||
export default createTestConfig({
|
||||
testFiles: [require.resolve('..')],
|
||||
suiteTags: { exclude: ['skipServerless'] },
|
||||
junit: {
|
||||
reportName: 'Security Solution - Telemetry Integration Tests - Serverless Env',
|
||||
},
|
||||
});
|
|
@ -0,0 +1,13 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
import { FtrProviderContext } from '../../ftr_provider_context';
|
||||
|
||||
export default ({ loadTestFile }: FtrProviderContext): void => {
|
||||
describe('Security Solution - Telemetry', function () {
|
||||
loadTestFile(require.resolve('./tasks/indices_metadata'));
|
||||
});
|
||||
};
|
|
@ -0,0 +1,175 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import {
|
||||
TELEMETRY_DATA_STREAM_EVENT,
|
||||
TELEMETRY_ILM_POLICY_EVENT,
|
||||
TELEMETRY_ILM_STATS_EVENT,
|
||||
TELEMETRY_INDEX_STATS_EVENT,
|
||||
} from '@kbn/security-solution-plugin/server/lib/telemetry/event_based/events';
|
||||
|
||||
import { FtrProviderContext } from '../../../ftr_provider_context';
|
||||
import {
|
||||
cleanupDatastreams,
|
||||
cleanupPolicies,
|
||||
ensureBackingIndices,
|
||||
launchTask,
|
||||
randomDatastream,
|
||||
randomIlmPolicy,
|
||||
taskHasRun,
|
||||
waitFor,
|
||||
} from '../../../../common/utils/security_solution';
|
||||
|
||||
const TASK_ID = 'security:indices-metadata-telemetry:1.0.0';
|
||||
const NUM_INDICES = 5;
|
||||
|
||||
export default ({ getService }: FtrProviderContext) => {
|
||||
const ebtServer = getService('kibana_ebt_server');
|
||||
const kibanaServer = getService('kibanaServer');
|
||||
const logger = getService('log');
|
||||
const es = getService('es');
|
||||
|
||||
describe('Indices metadata task telemetry', function () {
|
||||
let dsName: string;
|
||||
let policyName: string;
|
||||
|
||||
describe('@ess @serverless indices metadata', () => {
|
||||
beforeEach(async () => {
|
||||
dsName = await randomDatastream(es);
|
||||
await ensureBackingIndices(dsName, NUM_INDICES, es);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await cleanupDatastreams(es);
|
||||
});
|
||||
|
||||
it('should publish data stream events', async () => {
|
||||
const runAt = await launchTask(TASK_ID, kibanaServer, logger);
|
||||
|
||||
const opts = {
|
||||
eventTypes: [TELEMETRY_DATA_STREAM_EVENT.eventType],
|
||||
withTimeoutMs: 1000,
|
||||
fromTimestamp: new Date().toISOString(),
|
||||
};
|
||||
|
||||
await waitFor(
|
||||
async () => {
|
||||
const events = await ebtServer
|
||||
.getEvents(Number.MAX_SAFE_INTEGER, opts)
|
||||
.then((result) => result.map((ev) => ev.properties.items))
|
||||
.then((result) => result.flat())
|
||||
.then((result) => result.filter((ev) => (ev as any).datastream_name === dsName));
|
||||
|
||||
const hasRun = await taskHasRun(TASK_ID, kibanaServer, runAt);
|
||||
const eventCount = events.length;
|
||||
|
||||
return hasRun && eventCount === 1;
|
||||
},
|
||||
'waitForTaskToRun',
|
||||
logger
|
||||
);
|
||||
});
|
||||
|
||||
it('should publish index stats events', async () => {
|
||||
const runAt = await launchTask(TASK_ID, kibanaServer, logger);
|
||||
|
||||
const opts = {
|
||||
eventTypes: [TELEMETRY_INDEX_STATS_EVENT.eventType],
|
||||
withTimeoutMs: 1000,
|
||||
fromTimestamp: new Date().toISOString(),
|
||||
};
|
||||
|
||||
// .ds-<ds-name>-YYYY.MM.DD-NNNNNN
|
||||
const regex = new RegExp(`^\.ds-${dsName}-\\d{4}.\\d{2}.\\d{2}-\\d{6}$`);
|
||||
await waitFor(
|
||||
async () => {
|
||||
const events = await ebtServer
|
||||
.getEvents(Number.MAX_SAFE_INTEGER, opts)
|
||||
.then((result) => result.map((ev) => ev.properties.items))
|
||||
.then((result) => result.flat())
|
||||
.then((result) =>
|
||||
result.filter((ev) => regex.test((ev as any).index_name as string))
|
||||
);
|
||||
|
||||
const hasRun = await taskHasRun(TASK_ID, kibanaServer, runAt);
|
||||
|
||||
return hasRun && events.length === NUM_INDICES;
|
||||
},
|
||||
'waitForTaskToRun',
|
||||
logger
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('@ess indices metadata', function () {
|
||||
this.tags('skipServerless');
|
||||
|
||||
beforeEach(async () => {
|
||||
policyName = await randomIlmPolicy(es);
|
||||
dsName = await randomDatastream(es, policyName);
|
||||
await ensureBackingIndices(dsName, NUM_INDICES, es);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await cleanupDatastreams(es);
|
||||
await cleanupPolicies(es);
|
||||
});
|
||||
|
||||
it('should publish ilm policy events', async () => {
|
||||
const runAt = await launchTask(TASK_ID, kibanaServer, logger);
|
||||
|
||||
const opts = {
|
||||
eventTypes: [TELEMETRY_ILM_POLICY_EVENT.eventType],
|
||||
withTimeoutMs: 1000,
|
||||
fromTimestamp: new Date().toISOString(),
|
||||
};
|
||||
|
||||
await waitFor(
|
||||
async () => {
|
||||
const events = await ebtServer
|
||||
.getEvents(Number.MAX_SAFE_INTEGER, opts)
|
||||
.then((result) => result.map((ev) => ev.properties.items))
|
||||
.then((result) => result.flat())
|
||||
.then((result) => result.filter((ev) => (ev as any).policy_name === policyName));
|
||||
|
||||
const hasRun = await taskHasRun(TASK_ID, kibanaServer, runAt);
|
||||
|
||||
return hasRun && events.length === 1;
|
||||
},
|
||||
'waitForTaskToRun',
|
||||
logger
|
||||
);
|
||||
});
|
||||
|
||||
it('should publish ilm stats events', async () => {
|
||||
const runAt = await launchTask(TASK_ID, kibanaServer, logger);
|
||||
|
||||
const opts = {
|
||||
eventTypes: [TELEMETRY_ILM_STATS_EVENT.eventType],
|
||||
withTimeoutMs: 1000,
|
||||
fromTimestamp: new Date().toISOString(),
|
||||
};
|
||||
|
||||
await waitFor(
|
||||
async () => {
|
||||
const events = await ebtServer
|
||||
.getEvents(Number.MAX_SAFE_INTEGER, opts)
|
||||
.then((result) => result.map((ev) => ev.properties.items))
|
||||
.then((result) => result.flat())
|
||||
.then((result) => result.filter((ev) => (ev as any).policy_name === policyName));
|
||||
|
||||
const hasRun = await taskHasRun(TASK_ID, kibanaServer, runAt);
|
||||
|
||||
return hasRun && events.length === NUM_INDICES;
|
||||
},
|
||||
'waitForTaskToRun',
|
||||
logger
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
};
|
|
@ -52,5 +52,6 @@
|
|||
"@kbn/ftr-common-functional-ui-services",
|
||||
"@kbn/spaces-plugin",
|
||||
"@kbn/elastic-assistant-plugin",
|
||||
"@kbn/test-suites-src",
|
||||
]
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue