mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
# Backport This will backport the following commits from `main` to `9.0`: - [[Security Solution][Telemetry] Add ingest pipelines stats task (#213435)](https://github.com/elastic/kibana/pull/213435) <!--- Backport version: 9.6.6 --> ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sorenlouv/backport) <!--BACKPORT [{"author":{"name":"Sebastián Zaffarano","email":"sebastian.zaffarano@elastic.co"},"sourceCommit":{"committedDate":"2025-03-21T13:38:58Z","message":"[Security Solution][Telemetry] Add ingest pipelines stats task (#213435)\n\n## Summary\n\nAdd a new telemetry task to the security solution plugin to collect\ningest pipeline stats. The new task runs once a day, calls the\n`_nodes/stats/ingest` API, and sends an EBT event with the following\ninformation:\n\n```js\nexport interface NodeIngestPipelinesStats {\n name: string;\n totals: Totals;\n pipelines: Pipeline[];\n}\n\nexport interface Pipeline {\n name: string;\n totals: Totals;\n processors: Processor[];\n}\n\nexport interface Processor {\n name: string;\n totals: Totals;\n}\n\nexport interface Totals {\n count: number;\n time_in_millis: number;\n current: number;\n failed: number;\n}\n```\n\n### Checklist\n\nCheck the PR satisfies following conditions. \n\nReviewers should verify this PR satisfies this list as well.\n\n- [x] [Unit or functional\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\nwere updated or added to match the most common scenarios\n- [x] [Flaky Test\nRunner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was\nused on any tests changed\n\n---------\n\nCo-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>\nCo-authored-by: Ryland Herrick <ryalnd@gmail.com>","sha":"9cf3bea759591738cec5847454d789d33ff1d859","branchLabelMapping":{"^v9.1.0$":"main","^v8.19.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Team: SecuritySolution","backport:prev-minor","backport:prev-major","ci:build-cloud-image","ci:cloud-deploy","v9.1.0"],"title":"[Security Solution][Telemetry] Add ingest pipelines stats task","number":213435,"url":"https://github.com/elastic/kibana/pull/213435","mergeCommit":{"message":"[Security Solution][Telemetry] Add ingest pipelines stats task (#213435)\n\n## Summary\n\nAdd a new telemetry task to the security solution plugin to collect\ningest pipeline stats. The new task runs once a day, calls the\n`_nodes/stats/ingest` API, and sends an EBT event with the following\ninformation:\n\n```js\nexport interface NodeIngestPipelinesStats {\n name: string;\n totals: Totals;\n pipelines: Pipeline[];\n}\n\nexport interface Pipeline {\n name: string;\n totals: Totals;\n processors: Processor[];\n}\n\nexport interface Processor {\n name: string;\n totals: Totals;\n}\n\nexport interface Totals {\n count: number;\n time_in_millis: number;\n current: number;\n failed: number;\n}\n```\n\n### Checklist\n\nCheck the PR satisfies following conditions. \n\nReviewers should verify this PR satisfies this list as well.\n\n- [x] [Unit or functional\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\nwere updated or added to match the most common scenarios\n- [x] [Flaky Test\nRunner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was\nused on any tests changed\n\n---------\n\nCo-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>\nCo-authored-by: Ryland Herrick <ryalnd@gmail.com>","sha":"9cf3bea759591738cec5847454d789d33ff1d859"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/213435","number":213435,"mergeCommit":{"message":"[Security Solution][Telemetry] Add ingest pipelines stats task (#213435)\n\n## Summary\n\nAdd a new telemetry task to the security solution plugin to collect\ningest pipeline stats. The new task runs once a day, calls the\n`_nodes/stats/ingest` API, and sends an EBT event with the following\ninformation:\n\n```js\nexport interface NodeIngestPipelinesStats {\n name: string;\n totals: Totals;\n pipelines: Pipeline[];\n}\n\nexport interface Pipeline {\n name: string;\n totals: Totals;\n processors: Processor[];\n}\n\nexport interface Processor {\n name: string;\n totals: Totals;\n}\n\nexport interface Totals {\n count: number;\n time_in_millis: number;\n current: number;\n failed: number;\n}\n```\n\n### Checklist\n\nCheck the PR satisfies following conditions. \n\nReviewers should verify this PR satisfies this list as well.\n\n- [x] [Unit or functional\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\nwere updated or added to match the most common scenarios\n- [x] [Flaky Test\nRunner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was\nused on any tests changed\n\n---------\n\nCo-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>\nCo-authored-by: Ryland Herrick <ryalnd@gmail.com>","sha":"9cf3bea759591738cec5847454d789d33ff1d859"}}]}] BACKPORT--> Co-authored-by: Sebastián Zaffarano <sebastian.zaffarano@elastic.co>
This commit is contained in:
parent
b6a0e0eee5
commit
5ac71db387
13 changed files with 501 additions and 2 deletions
|
@ -114,9 +114,13 @@ describe('configuration', () => {
|
|||
indices_metadata_config: {
|
||||
indices_threshold: getRandomInt(1, 100),
|
||||
datastreams_threshold: getRandomInt(1, 100),
|
||||
indices_settings_threshold: getRandomInt(1, 100),
|
||||
max_prefixes: getRandomInt(1, 100),
|
||||
max_group_size: getRandomInt(1, 100),
|
||||
},
|
||||
ingest_pipelines_stats_config: {
|
||||
enabled: false,
|
||||
},
|
||||
};
|
||||
|
||||
mockAxiosGet(mockedAxiosGet, [
|
||||
|
@ -147,6 +151,9 @@ describe('configuration', () => {
|
|||
expect(telemetryConfiguration.indices_metadata_config).toEqual(
|
||||
expected.indices_metadata_config
|
||||
);
|
||||
expect(telemetryConfiguration.ingest_pipelines_stats_config).toEqual(
|
||||
expected.ingest_pipelines_stats_config
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
import os from 'os';
|
||||
import type {
|
||||
IndicesMetadataConfiguration,
|
||||
IngestPipelinesStatsConfiguration,
|
||||
PaginationConfiguration,
|
||||
TelemetrySenderChannelConfiguration,
|
||||
} from './types';
|
||||
|
@ -28,11 +29,15 @@ class TelemetryConfigurationDTO {
|
|||
private readonly DEFAULT_INDICES_METADATA_CONFIG = {
|
||||
indices_threshold: 10000,
|
||||
datastreams_threshold: 1000,
|
||||
indices_settings_threshold: 10000,
|
||||
|
||||
max_prefixes: 10, // @deprecated
|
||||
max_group_size: 100, // @deprecated
|
||||
min_group_size: 5, // @deprecated
|
||||
};
|
||||
private readonly DEFAULT_INGEST_PIPELINES_STATS_CONFIG = {
|
||||
enabled: true,
|
||||
};
|
||||
|
||||
private _telemetry_max_buffer_size = this.DEFAULT_TELEMETRY_MAX_BUFFER_SIZE;
|
||||
private _max_security_list_telemetry_batch = this.DEFAULT_MAX_SECURITY_LIST_TELEMETRY_BATCH;
|
||||
|
@ -46,6 +51,8 @@ class TelemetryConfigurationDTO {
|
|||
private _pagination_config: PaginationConfiguration = this.DEFAULT_PAGINATION_CONFIG;
|
||||
private _indices_metadata_config: IndicesMetadataConfiguration =
|
||||
this.DEFAULT_INDICES_METADATA_CONFIG;
|
||||
private _ingest_pipelines_stats_config: IngestPipelinesStatsConfiguration =
|
||||
this.DEFAULT_INGEST_PIPELINES_STATS_CONFIG;
|
||||
|
||||
public get telemetry_max_buffer_size(): number {
|
||||
return this._telemetry_max_buffer_size;
|
||||
|
@ -111,14 +118,24 @@ class TelemetryConfigurationDTO {
|
|||
return this._pagination_config;
|
||||
}
|
||||
|
||||
public set indices_metadata_config(paginationConfiguration: IndicesMetadataConfiguration) {
|
||||
this._indices_metadata_config = paginationConfiguration;
|
||||
public set indices_metadata_config(indicesMetadataConfiguration: IndicesMetadataConfiguration) {
|
||||
this._indices_metadata_config = indicesMetadataConfiguration;
|
||||
}
|
||||
|
||||
public get indices_metadata_config(): IndicesMetadataConfiguration {
|
||||
return this._indices_metadata_config;
|
||||
}
|
||||
|
||||
public set ingest_pipelines_stats_config(
|
||||
ingestPipelinesStatsConfiguration: IngestPipelinesStatsConfiguration
|
||||
) {
|
||||
this._ingest_pipelines_stats_config = ingestPipelinesStatsConfiguration;
|
||||
}
|
||||
|
||||
public get ingest_pipelines_stats_config(): IngestPipelinesStatsConfiguration {
|
||||
return this._ingest_pipelines_stats_config;
|
||||
}
|
||||
|
||||
public resetAllToDefault() {
|
||||
this._telemetry_max_buffer_size = this.DEFAULT_TELEMETRY_MAX_BUFFER_SIZE;
|
||||
this._max_security_list_telemetry_batch = this.DEFAULT_MAX_SECURITY_LIST_TELEMETRY_BATCH;
|
||||
|
@ -128,6 +145,7 @@ class TelemetryConfigurationDTO {
|
|||
this._sender_channels = this.DEFAULT_SENDER_CHANNELS;
|
||||
this._pagination_config = this.DEFAULT_PAGINATION_CONFIG;
|
||||
this._indices_metadata_config = this.DEFAULT_INDICES_METADATA_CONFIG;
|
||||
this._ingest_pipelines_stats_config = this.DEFAULT_INGEST_PIPELINES_STATS_CONFIG;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@ import type {
|
|||
ResponseActionsApiCommandNames,
|
||||
} from '../../../../common/endpoint/service/response_actions/constants';
|
||||
import type { DataStreams, IlmPolicies, IlmsStats, IndicesStats } from '../indices.metadata.types';
|
||||
import type { NodeIngestPipelinesStats } from '../ingest_pipelines_stats.types';
|
||||
import { SiemMigrationsEventTypes } from './types';
|
||||
|
||||
export const RISK_SCORE_EXECUTION_SUCCESS_EVENT: EventTypeOpts<{
|
||||
|
@ -529,11 +530,130 @@ export const TELEMETRY_ILM_STATS_EVENT: EventTypeOpts<IlmsStats> = {
|
|||
},
|
||||
};
|
||||
|
||||
export const TELEMETRY_NODE_INGEST_PIPELINES_STATS_EVENT: EventTypeOpts<NodeIngestPipelinesStats> =
|
||||
{
|
||||
eventType: 'telemetry_node_ingest_pipelines_stats_event',
|
||||
schema: {
|
||||
name: {
|
||||
type: 'keyword',
|
||||
_meta: { description: 'The name of the node' },
|
||||
},
|
||||
pipelines: {
|
||||
type: 'array',
|
||||
items: {
|
||||
properties: {
|
||||
name: {
|
||||
type: 'keyword',
|
||||
_meta: { description: 'The name of the pipeline.' },
|
||||
},
|
||||
totals: {
|
||||
properties: {
|
||||
count: {
|
||||
type: 'long',
|
||||
_meta: {
|
||||
description:
|
||||
'Total number of documents ingested during the lifetime of this node.',
|
||||
},
|
||||
},
|
||||
time_in_millis: {
|
||||
type: 'long',
|
||||
_meta: {
|
||||
description: 'Ingestion elapsed time during the lifetime of this node.',
|
||||
},
|
||||
},
|
||||
current: {
|
||||
type: 'long',
|
||||
_meta: { description: 'Total number of documents currently being ingested.' },
|
||||
},
|
||||
failed: {
|
||||
type: 'long',
|
||||
_meta: {
|
||||
description:
|
||||
'Total number of failed ingest operations during the lifetime of this node.',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
processors: {
|
||||
type: 'array',
|
||||
items: {
|
||||
properties: {
|
||||
name: {
|
||||
type: 'keyword',
|
||||
_meta: { description: 'The name of the pipeline.' },
|
||||
},
|
||||
totals: {
|
||||
properties: {
|
||||
count: {
|
||||
type: 'long',
|
||||
_meta: {
|
||||
description:
|
||||
'Total number of documents ingested during the lifetime of this node.',
|
||||
},
|
||||
},
|
||||
time_in_millis: {
|
||||
type: 'long',
|
||||
_meta: {
|
||||
description: 'Ingestion elapsed time during the lifetime of this node.',
|
||||
},
|
||||
},
|
||||
current: {
|
||||
type: 'long',
|
||||
_meta: {
|
||||
description: 'Total number of documents currently being ingested.',
|
||||
},
|
||||
},
|
||||
failed: {
|
||||
type: 'long',
|
||||
_meta: {
|
||||
description:
|
||||
'Total number of failed ingest operations during the lifetime of this node.',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
_meta: { description: 'Datastreams' },
|
||||
},
|
||||
},
|
||||
},
|
||||
_meta: { description: 'Datastreams' },
|
||||
},
|
||||
},
|
||||
totals: {
|
||||
properties: {
|
||||
count: {
|
||||
type: 'long',
|
||||
_meta: {
|
||||
description: 'Total number of documents ingested during the lifetime of this node.',
|
||||
},
|
||||
},
|
||||
time_in_millis: {
|
||||
type: 'long',
|
||||
_meta: { description: 'Ingestion elapsed time during the lifetime of this node.' },
|
||||
},
|
||||
current: {
|
||||
type: 'long',
|
||||
_meta: { description: 'Total number of documents currently being ingested.' },
|
||||
},
|
||||
failed: {
|
||||
type: 'long',
|
||||
_meta: {
|
||||
description:
|
||||
'Total number of failed ingest operations during the lifetime of this node.',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
interface CreateAssetCriticalityProcessedFileEvent {
|
||||
result?: BulkUpsertAssetCriticalityRecordsResponse['stats'];
|
||||
startTime: Date;
|
||||
endTime: Date;
|
||||
}
|
||||
|
||||
export const createAssetCriticalityProcessedFileEvent = ({
|
||||
result,
|
||||
startTime,
|
||||
|
@ -1046,6 +1166,7 @@ export const events = [
|
|||
TELEMETRY_DATA_STREAM_EVENT,
|
||||
TELEMETRY_ILM_POLICY_EVENT,
|
||||
TELEMETRY_ILM_STATS_EVENT,
|
||||
TELEMETRY_NODE_INGEST_PIPELINES_STATS_EVENT,
|
||||
TELEMETRY_INDEX_STATS_EVENT,
|
||||
SIEM_MIGRATIONS_MIGRATION_SUCCESS,
|
||||
SIEM_MIGRATIONS_MIGRATION_FAILURE,
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
export interface NodeIngestPipelinesStats {
|
||||
name: string;
|
||||
totals: Totals;
|
||||
pipelines: Pipeline[];
|
||||
}
|
||||
|
||||
export interface Pipeline {
|
||||
name: string;
|
||||
totals: Totals;
|
||||
processors: Processor[];
|
||||
}
|
||||
|
||||
export interface Processor {
|
||||
name: string;
|
||||
totals: Totals;
|
||||
}
|
||||
|
||||
export interface Totals {
|
||||
count: number;
|
||||
time_in_millis: number;
|
||||
current: number;
|
||||
failed: number;
|
||||
}
|
|
@ -29,6 +29,8 @@ import type {
|
|||
IndicesStatsRequest,
|
||||
IlmGetLifecycleRequest,
|
||||
IndicesGetRequest,
|
||||
NodesStatsRequest,
|
||||
Duration,
|
||||
} from '@elastic/elasticsearch/lib/api/types';
|
||||
import { ENDPOINT_ARTIFACT_LISTS } from '@kbn/securitysolution-list-constants';
|
||||
import {
|
||||
|
@ -100,6 +102,12 @@ import type {
|
|||
IndexStats,
|
||||
} from './indices.metadata.types';
|
||||
import { chunkStringsByMaxLength } from './collections_helpers';
|
||||
import type {
|
||||
NodeIngestPipelinesStats,
|
||||
Pipeline,
|
||||
Processor,
|
||||
Totals,
|
||||
} from './ingest_pipelines_stats.types';
|
||||
|
||||
export interface ITelemetryReceiver {
|
||||
start(
|
||||
|
@ -257,6 +265,8 @@ export interface ITelemetryReceiver {
|
|||
getIndicesStats(indices: string[]): AsyncGenerator<IndexStats, void, unknown>;
|
||||
getIlmsStats(indices: string[]): AsyncGenerator<IlmStats, void, unknown>;
|
||||
getIlmsPolicies(ilms: string[]): AsyncGenerator<IlmPolicy, void, unknown>;
|
||||
|
||||
getIngestPipelinesStats(timeout: Duration): Promise<NodeIngestPipelinesStats[]>;
|
||||
}
|
||||
|
||||
export class TelemetryReceiver implements ITelemetryReceiver {
|
||||
|
@ -1514,4 +1524,76 @@ export class TelemetryReceiver implements ITelemetryReceiver {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async getIngestPipelinesStats(timeout: Duration): Promise<NodeIngestPipelinesStats[]> {
|
||||
const es = this.esClient();
|
||||
|
||||
this.logger.l('Fetching ingest pipelines stats');
|
||||
|
||||
const request: NodesStatsRequest = {
|
||||
metric: 'ingest',
|
||||
filter_path: [
|
||||
'nodes.*.ingest.total',
|
||||
'nodes.*.ingest.pipelines.*.count',
|
||||
'nodes.*.ingest.pipelines.*.time_in_millis',
|
||||
'nodes.*.ingest.pipelines.*.failed',
|
||||
'nodes.*.ingest.pipelines.*.current',
|
||||
'nodes.*.ingest.pipelines.*.processors.*.stats.count',
|
||||
'nodes.*.ingest.pipelines.*.processors.*.stats.time_in_millis',
|
||||
'nodes.*.ingest.pipelines.*.processors.*.stats.failed',
|
||||
'nodes.*.ingest.pipelines.*.processors.*.stats.current',
|
||||
],
|
||||
timeout,
|
||||
};
|
||||
|
||||
return es.nodes
|
||||
.stats(request)
|
||||
.then((response) => {
|
||||
return Object.entries(response.nodes).map(([nodeName, node]) => {
|
||||
return {
|
||||
name: nodeName,
|
||||
totals: {
|
||||
count: node.ingest?.total?.count ?? 0,
|
||||
time_in_millis: node.ingest?.total?.time_in_millis ?? 0,
|
||||
current: node.ingest?.total?.current ?? 0,
|
||||
failed: node.ingest?.total?.failed ?? 0,
|
||||
} as Totals,
|
||||
pipelines: Object.entries(node.ingest?.pipelines ?? []).map(
|
||||
([pipelineName, pipeline]) => {
|
||||
return {
|
||||
name: pipelineName,
|
||||
totals: {
|
||||
count: pipeline.count,
|
||||
time_in_millis: pipeline.time_in_millis,
|
||||
current: pipeline.current,
|
||||
failed: pipeline.failed,
|
||||
} as Totals,
|
||||
processors: (pipeline.processors ?? [])
|
||||
.map((processors) => {
|
||||
return Object.entries(processors).map(([processorName, processor]) => {
|
||||
return {
|
||||
name: processorName,
|
||||
totals: {
|
||||
count: processor.stats?.count ?? 0,
|
||||
time_in_millis: processor.stats?.time_in_millis ?? 0,
|
||||
current: processor.stats?.current ?? 0,
|
||||
failed: processor.stats?.failed ?? 0,
|
||||
} as Totals,
|
||||
} as Processor;
|
||||
});
|
||||
})
|
||||
.flat(),
|
||||
} as Pipeline;
|
||||
}
|
||||
),
|
||||
} as NodeIngestPipelinesStats;
|
||||
});
|
||||
})
|
||||
.catch((error) => {
|
||||
this.logger.warn('Error fetching ingest pipelines stats', {
|
||||
error_message: error,
|
||||
} as LogMeta);
|
||||
throw error;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -112,6 +112,12 @@ export function createTelemetryConfigurationTaskConfig() {
|
|||
telemetryConfiguration.indices_metadata_config = configArtifact.indices_metadata_config;
|
||||
}
|
||||
|
||||
if (configArtifact.ingest_pipelines_stats_config) {
|
||||
log.l('Updating ingest pipelines stats configuration');
|
||||
telemetryConfiguration.ingest_pipelines_stats_config =
|
||||
configArtifact.ingest_pipelines_stats_config;
|
||||
}
|
||||
|
||||
await taskMetricsService.end(trace);
|
||||
|
||||
log.l('Updated TelemetryConfiguration', { configuration: telemetryConfiguration });
|
||||
|
|
|
@ -17,6 +17,7 @@ import { createTelemetryConfigurationTaskConfig } from './configuration';
|
|||
import { telemetryConfiguration } from '../configuration';
|
||||
import { createTelemetryFilterListArtifactTaskConfig } from './filterlists';
|
||||
import { createTelemetryIndicesMetadataTaskConfig } from './indices.metadata';
|
||||
import { createIngestStatsTaskConfig } from './ingest_pipelines_stats';
|
||||
|
||||
export function createTelemetryTaskConfigs(): SecurityTelemetryTaskConfig[] {
|
||||
return [
|
||||
|
@ -32,5 +33,6 @@ export function createTelemetryTaskConfigs(): SecurityTelemetryTaskConfig[] {
|
|||
createTelemetryConfigurationTaskConfig(),
|
||||
createTelemetryFilterListArtifactTaskConfig(),
|
||||
createTelemetryIndicesMetadataTaskConfig(),
|
||||
createIngestStatsTaskConfig(),
|
||||
];
|
||||
}
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { LogMeta, Logger } from '@kbn/core/server';
|
||||
import type { ITelemetryEventsSender } from '../sender';
|
||||
import type { ITelemetryReceiver } from '../receiver';
|
||||
import type { TaskExecutionPeriod } from '../task';
|
||||
import type { ITaskMetricsService } from '../task_metrics.types';
|
||||
import { TelemetryCounter } from '../types';
|
||||
import {
|
||||
createUsageCounterLabel,
|
||||
getPreviousDailyTaskTimestamp,
|
||||
newTelemetryLogger,
|
||||
} from '../helpers';
|
||||
import { TELEMETRY_NODE_INGEST_PIPELINES_STATS_EVENT } from '../event_based/events';
|
||||
import { telemetryConfiguration } from '../configuration';
|
||||
|
||||
const COUNTER_LABELS = ['security_solution', 'pipelines-stats'];
|
||||
|
||||
export function createIngestStatsTaskConfig() {
|
||||
const taskType = 'security:ingest-pipelines-stats-telemetry';
|
||||
return {
|
||||
type: taskType,
|
||||
title: 'Security Solution Telemetry Ingest Pipelines Stats task',
|
||||
interval: '24h',
|
||||
timeout: '5m',
|
||||
version: '1.0.0',
|
||||
getLastExecutionTime: getPreviousDailyTaskTimestamp,
|
||||
runTask: async (
|
||||
taskId: string,
|
||||
logger: Logger,
|
||||
receiver: ITelemetryReceiver,
|
||||
sender: ITelemetryEventsSender,
|
||||
taskMetricsService: ITaskMetricsService,
|
||||
taskExecutionPeriod: TaskExecutionPeriod
|
||||
) => {
|
||||
const mdc = { task_id: taskId, task_execution_period: taskExecutionPeriod };
|
||||
const log = newTelemetryLogger(logger.get('indices-metadata'), mdc);
|
||||
const trace = taskMetricsService.start(taskType);
|
||||
|
||||
const taskConfig = telemetryConfiguration.ingest_pipelines_stats_config;
|
||||
|
||||
const start = performance.now();
|
||||
|
||||
try {
|
||||
logger.info('Running ingest stats task');
|
||||
|
||||
if (!taskConfig.enabled) {
|
||||
logger.info('Ingest stats task is disabled, skipping');
|
||||
await taskMetricsService.end(trace);
|
||||
return 0;
|
||||
}
|
||||
|
||||
const ingestStats = await receiver.getIngestPipelinesStats('3m');
|
||||
|
||||
logger.info('Got ingest stats, about to publish EBT events', {
|
||||
count: ingestStats.length,
|
||||
} as LogMeta);
|
||||
|
||||
ingestStats.forEach((stats) => {
|
||||
sender.reportEBT(TELEMETRY_NODE_INGEST_PIPELINES_STATS_EVENT, stats);
|
||||
});
|
||||
|
||||
const telemetryUsageCounter = sender.getTelemetryUsageCluster();
|
||||
|
||||
telemetryUsageCounter?.incrementCounter({
|
||||
counterName: createUsageCounterLabel(COUNTER_LABELS.concat('events')),
|
||||
counterType: TelemetryCounter.DOCS_SENT,
|
||||
incrementBy: ingestStats.length,
|
||||
});
|
||||
|
||||
await taskMetricsService.end(trace);
|
||||
|
||||
log.info('Ingest stats task completed', {
|
||||
count: ingestStats.length,
|
||||
elapsed: performance.now() - start,
|
||||
} as LogMeta);
|
||||
|
||||
return ingestStats.length;
|
||||
} catch (err) {
|
||||
log.warn(`Error running ingest stats task`, {
|
||||
error: err.message,
|
||||
elapsed: performance.now() - start,
|
||||
} as LogMeta);
|
||||
await taskMetricsService.end(trace, err);
|
||||
return 0;
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
|
@ -482,16 +482,22 @@ export interface TelemetryConfiguration {
|
|||
};
|
||||
pagination_config?: PaginationConfiguration;
|
||||
indices_metadata_config?: IndicesMetadataConfiguration;
|
||||
ingest_pipelines_stats_config?: IngestPipelinesStatsConfiguration;
|
||||
}
|
||||
|
||||
export interface IndicesMetadataConfiguration {
|
||||
indices_threshold: number;
|
||||
datastreams_threshold: number;
|
||||
indices_settings_threshold: number;
|
||||
max_prefixes: number;
|
||||
max_group_size: number;
|
||||
min_group_size: number;
|
||||
}
|
||||
|
||||
export interface IngestPipelinesStatsConfiguration {
|
||||
enabled: boolean;
|
||||
}
|
||||
|
||||
export interface PaginationConfiguration {
|
||||
max_page_size_bytes: number;
|
||||
num_docs_to_sample: number;
|
||||
|
|
|
@ -7,9 +7,39 @@
|
|||
|
||||
import { Client } from '@elastic/elasticsearch';
|
||||
|
||||
const INGEST_PIPELINE_PREFIX = 'testing-ingest-pipeline';
|
||||
const DS_PREFIX = 'testing-datastream';
|
||||
const ILM_PREFIX = 'testing-ilm';
|
||||
|
||||
export const randomIngestPipeline = async (es: Client): Promise<string> => {
|
||||
const id = `${INGEST_PIPELINE_PREFIX}-${Date.now()}`;
|
||||
|
||||
await es.ingest.putPipeline({
|
||||
id,
|
||||
processors: [
|
||||
{
|
||||
set: {
|
||||
field: 'message',
|
||||
value: 'changed',
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
return id;
|
||||
};
|
||||
|
||||
export const indexRandomData = async (es: Client, dsName: string, ingestPipeline: string) => {
|
||||
await es.index({
|
||||
index: dsName,
|
||||
pipeline: ingestPipeline,
|
||||
document: {
|
||||
'@timestamp': new Date(),
|
||||
key: `value-${Date.now()}`,
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
export const randomDatastream = async (es: Client, policyName?: string): Promise<string> => {
|
||||
const name = `${DS_PREFIX}-${Date.now()}`;
|
||||
|
||||
|
@ -101,6 +131,10 @@ export const cleanupDatastreams = async (es: Client) => {
|
|||
await es.indices.deleteDataStream({ name: `${DS_PREFIX}*` });
|
||||
};
|
||||
|
||||
export const cleanupIngestPipelines = async (es: Client) => {
|
||||
es.ingest.deletePipeline({ id: `${INGEST_PIPELINE_PREFIX}*` });
|
||||
};
|
||||
|
||||
export const cleanupPolicies = async (es: Client) => {
|
||||
const policies = await es.ilm.getLifecycle({ name: `${ILM_PREFIX}*` });
|
||||
|
||||
|
|
|
@ -166,6 +166,7 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
'security:endpoint-diagnostics',
|
||||
'security:endpoint-meta-telemetry',
|
||||
'security:indices-metadata-telemetry',
|
||||
'security:ingest-pipelines-stats-telemetry',
|
||||
'security:telemetry-configuration',
|
||||
'security:telemetry-detection-rules',
|
||||
'security:telemetry-diagnostic-timelines',
|
||||
|
|
|
@ -9,6 +9,7 @@ import { FtrProviderContext } from '../../ftr_provider_context';
|
|||
export default ({ loadTestFile }: FtrProviderContext): void => {
|
||||
describe('Security Solution - Telemetry', function () {
|
||||
loadTestFile(require.resolve('./tasks/indices_metadata'));
|
||||
loadTestFile(require.resolve('./tasks/ingest_pipeline_stats'));
|
||||
loadTestFile(require.resolve('./tasks/endpoint'));
|
||||
});
|
||||
};
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { FtrProviderContext } from '../../../ftr_provider_context';
|
||||
import {
|
||||
cleanupDatastreams,
|
||||
cleanupIngestPipelines,
|
||||
indexRandomData,
|
||||
launchTask,
|
||||
randomDatastream,
|
||||
randomIngestPipeline,
|
||||
taskHasRun,
|
||||
waitFor,
|
||||
} from '../../../../common/utils/security_solution';
|
||||
|
||||
const TASK_ID = 'security:ingest-pipelines-stats-telemetry:1.0.0';
|
||||
const INGEST_PIPELINES_STATS_EBT = 'telemetry_node_ingest_pipelines_stats_event';
|
||||
|
||||
export default ({ getService }: FtrProviderContext) => {
|
||||
const ebtServer = getService('kibana_ebt_server');
|
||||
const kibanaServer = getService('kibanaServer');
|
||||
const logger = getService('log');
|
||||
const es = getService('es');
|
||||
|
||||
describe('Security Telemetry - Ingest pipeline stats task.', function () {
|
||||
let datastream: string;
|
||||
let pipeline: string;
|
||||
|
||||
describe('@ess @serverless indices metadata', () => {
|
||||
beforeEach(async () => {
|
||||
datastream = await randomDatastream(es);
|
||||
pipeline = await randomIngestPipeline(es);
|
||||
|
||||
await indexRandomData(es, datastream, pipeline);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await cleanupDatastreams(es);
|
||||
await cleanupIngestPipelines(es);
|
||||
});
|
||||
|
||||
it('should publish events when scheduled', async () => {
|
||||
const runAt = await launchTask(TASK_ID, kibanaServer, logger);
|
||||
|
||||
const opts = {
|
||||
eventTypes: [INGEST_PIPELINES_STATS_EBT],
|
||||
withTimeoutMs: 1000,
|
||||
fromTimestamp: new Date().toISOString(),
|
||||
};
|
||||
|
||||
await waitFor(
|
||||
async () => {
|
||||
const events = await ebtServer.getEvents(Number.MAX_SAFE_INTEGER, opts);
|
||||
|
||||
const hasRun = await taskHasRun(TASK_ID, kibanaServer, runAt);
|
||||
const eventCount = events.length;
|
||||
|
||||
return hasRun && eventCount >= 0;
|
||||
},
|
||||
'waitForTaskToRun',
|
||||
logger
|
||||
);
|
||||
});
|
||||
|
||||
it('should publish events for a new pipeline', async () => {
|
||||
const runAt = await launchTask(TASK_ID, kibanaServer, logger);
|
||||
|
||||
const opts = {
|
||||
eventTypes: [INGEST_PIPELINES_STATS_EBT],
|
||||
withTimeoutMs: 1000,
|
||||
fromTimestamp: new Date().toISOString(),
|
||||
};
|
||||
|
||||
await waitFor(
|
||||
async () => {
|
||||
const events = await ebtServer
|
||||
.getEvents(Number.MAX_SAFE_INTEGER, opts)
|
||||
.then((result) => result.map((ev) => ev.properties.pipelines))
|
||||
.then((result) => result.flat())
|
||||
.then((result) => result.filter((ev) => (ev as any).name === pipeline));
|
||||
|
||||
const hasRun = await taskHasRun(TASK_ID, kibanaServer, runAt);
|
||||
const eventCount = events.length;
|
||||
|
||||
return hasRun && eventCount >= 1;
|
||||
},
|
||||
'waitForTaskToRun',
|
||||
logger
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
};
|
Loading…
Add table
Add a link
Reference in a new issue