mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
[Logs Data Telemetry] Create background job to collect and send logs data telemetry (#189380)
## Summary
The PR creates a service which runs in background as a Kibana Task and
lazily collects and processes logs data telemetry events. This
implementation collects the data by reading indices info and prepares
the telemetry events. These events will be reported to stack telemetry
in follow up PRs.
The service groups the stats per
[pattern_name](1116ac6daa/src/plugins/telemetry/server/telemetry_collection/get_data_telemetry/constants.ts (L42)
)
and gathers the following information:
- Docs and indices count (regular and failure)
- Count of unique namespaces found in data streams matching a pattern
- Size of the documents (regular only)
- Meta information (managed by, package name and if beat information is
found in mappings)
- Total fields count and count of individual log centric fields
- Count of docs corresponding to each structure level
The service gathers the data streams information and mapping and
generate events in the following manner:
```yml
[
{
"pattern_name": "heartbeat",
"shipper": "heartbeat",
"doc_count": 9239,
"structure_level": {
"5": 9239
},
"index_count": 1,
"failure_store_doc_count": 9239,
"failure_store_index_count": 1,
"namespace_count": 0,
"field_count": 1508,
"field_existence": {
"container.id": 9239,
"log.level": 9239,
"container.name": 9239,
"host.name": 9239,
"host.hostname": 9239,
"kubernetes.pod.name": 9239,
"kubernetes.pod.uid": 9239,
"cloud.provider": 9239,
"agent.type": 9239,
"event.dataset": 9239,
"event.category": 9239,
"event.module": 9239,
"service.name": 9239,
"service.type": 9239,
"service.version": 9239,
"message": 9239,
"event.original": 9239,
"error.message": 9239,
"@timestamp": 9239,
"data_stream.dataset": 9239,
"data_stream.namespace": 9239,
"data_stream.type": 9239
},
"size_in_bytes": 12382655,
"managed_by": [],
"package_name": [],
"beat": [
"heartbeat"
]
},
{
"pattern_name": "nginx",
"doc_count": 10080,
"structure_level": {
"6": 10080
},
"index_count": 1,
"failure_store_doc_count": 0,
"failure_store_index_count": 0,
"namespace_count": 1,
"field_count": 1562,
"field_existence": {
"container.id": 10080,
"log.level": 10080,
"host.name": 10080,
"kubernetes.pod.uid": 10080,
"cloud.provider": 10080,
"event.dataset": 10080,
"service.name": 10080,
"message": 10080,
"@timestamp": 10080,
"data_stream.dataset": 10080,
"data_stream.namespace": 10080,
"data_stream.type": 10080
},
"size_in_bytes": 12098071,
"managed_by": [],
"package_name": [],
"beat": []
},
{
"pattern_name": "apache",
"doc_count": 1439,
"structure_level": {
"6": 1439
},
"index_count": 1,
"failure_store_doc_count": 0,
"failure_store_index_count": 0,
"namespace_count": 2,
"field_count": 1562,
"field_existence": {
"container.id": 1439,
"log.level": 1439,
"host.name": 1439,
"kubernetes.pod.uid": 1439,
"cloud.provider": 1439,
"event.dataset": 1439,
"service.name": 1439,
"message": 1439,
"@timestamp": 1439,
"data_stream.dataset": 1439,
"data_stream.namespace": 1439,
"data_stream.type": 1439
},
"size_in_bytes": 4425502,
"managed_by": [],
"package_name": [],
"beat": []
},
{
"pattern_name": "generic-logs",
"doc_count": 106659,
"structure_level": {
"2": 100907,
"3": 5752
},
"index_count": 6,
"failure_store_doc_count": 0,
"failure_store_index_count": 0,
"namespace_count": 2,
"field_count": 1581,
"field_existence": {
"log.level": 106659,
"host.name": 106659,
"service.name": 106659,
"@timestamp": 106659,
"data_stream.dataset": 106659,
"data_stream.namespace": 106659,
"data_stream.type": 106659,
"container.id": 5752,
"kubernetes.pod.uid": 5752,
"cloud.provider": 5752,
"event.dataset": 5752,
"message": 5752
},
"size_in_bytes": 29752097,
"managed_by": [],
"package_name": [],
"beat": []
}
]
```
---------
Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
parent
1b32603f44
commit
13736730d1
12 changed files with 1848 additions and 6 deletions
|
@ -55,6 +55,7 @@ export const DATA_DATASETS_INDEX_PATTERNS = [
|
|||
{ pattern: 'telegraf*', patternName: 'telegraf' },
|
||||
{ pattern: 'prometheusbeat*', patternName: 'prometheusbeat' },
|
||||
{ pattern: 'fluentbit*', patternName: 'fluentbit' },
|
||||
{ pattern: 'fluent-bit*', patternName: 'fluentbit' },
|
||||
{ pattern: '*nginx*', patternName: 'nginx' },
|
||||
{ pattern: '*apache*', patternName: 'apache' }, // Already in Security (keeping it in here for documentation)
|
||||
{ pattern: '*logs*', patternName: 'generic-logs' },
|
||||
|
|
|
@ -23,9 +23,10 @@
|
|||
"fieldFormats",
|
||||
"dataViews",
|
||||
"lens",
|
||||
"fieldsMetadata"
|
||||
"fieldsMetadata",
|
||||
"taskManager"
|
||||
],
|
||||
"optionalPlugins": [],
|
||||
"optionalPlugins": ["telemetry"],
|
||||
"requiredBundles": ["unifiedHistogram", "discover"],
|
||||
"extraPublicDirs": [
|
||||
"common"
|
||||
|
|
|
@ -5,8 +5,9 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import { CoreSetup, Logger, Plugin, PluginInitializerContext } from '@kbn/core/server';
|
||||
import { CoreSetup, CoreStart, Logger, Plugin, PluginInitializerContext } from '@kbn/core/server';
|
||||
import { mapValues } from 'lodash';
|
||||
import { DataTelemetryService } from './services';
|
||||
import { getDatasetQualityServerRouteRepository } from './routes';
|
||||
import { registerRoutes } from './routes/register_routes';
|
||||
import { DatasetQualityRouteHandlerResources } from './routes/types';
|
||||
|
@ -18,9 +19,11 @@ import {
|
|||
|
||||
export class DatasetQualityServerPlugin implements Plugin {
|
||||
private readonly logger: Logger;
|
||||
private readonly dataTelemetryService: DataTelemetryService;
|
||||
|
||||
constructor(initializerContext: PluginInitializerContext) {
|
||||
this.logger = initializerContext.logger.get();
|
||||
this.dataTelemetryService = new DataTelemetryService(this.logger);
|
||||
}
|
||||
|
||||
setup(
|
||||
|
@ -53,10 +56,18 @@ export class DatasetQualityServerPlugin implements Plugin {
|
|||
getEsCapabilities,
|
||||
});
|
||||
|
||||
// Setup Data Telemetry Service
|
||||
this.dataTelemetryService.setup(core.analytics, plugins.taskManager);
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
start() {
|
||||
start(core: CoreStart, plugins: DatasetQualityPluginStartDependencies) {
|
||||
// Start Data Telemetry Service
|
||||
this.dataTelemetryService.start(plugins.telemetry, core, plugins.taskManager).catch((error) => {
|
||||
this.logger.error(`[Data Telemetry Service]: ${error}`);
|
||||
});
|
||||
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { DATA_DATASETS_INDEX_PATTERNS_UNIQUE } from '@kbn/telemetry-plugin/server/telemetry_collection/get_data_telemetry/constants';
|
||||
|
||||
import { DatasetIndexPattern } from './types';
|
||||
|
||||
export const LOGS_DATA_TELEMETRY_TASK_TYPE = 'logs-data-telemetry';
|
||||
export const LOGS_DATA_TELEMETRY_TASK_ID = 'logs-data-telemetry:collect-and-report-task-2';
|
||||
|
||||
export const TELEMETRY_TASK_INTERVAL = 24 * 60; // 24 hours (in minutes)
|
||||
export const TELEMETRY_TASK_TIMEOUT = 10; // 10 minutes
|
||||
|
||||
export const BREATHE_DELAY_SHORT = 1000; // 1 seconds
|
||||
export const BREATHE_DELAY_MEDIUM = 5 * 1000; // 5 seconds
|
||||
|
||||
export const MAX_STREAMS_TO_REPORT = 1000;
|
||||
|
||||
export const NON_LOG_SIGNALS = ['metrics', 'traces', 'internal', 'synthetics'];
|
||||
export const EXCLUDE_ELASTIC_LOGS = ['logs-synth', 'logs-elastic', 'logs-endpoint'];
|
||||
|
||||
export const TELEMETRY_CHANNEL = 'logs-data-telemetry';
|
||||
|
||||
const LOGS_INDEX_PATTERN_NAMES = [
|
||||
'filebeat',
|
||||
'generic-filebeat',
|
||||
'metricbeat',
|
||||
'generic-metricbeat',
|
||||
'apm',
|
||||
'functionbeat',
|
||||
'generic-functionbeat',
|
||||
'heartbeat',
|
||||
'generic-heartbeat',
|
||||
'logstash',
|
||||
'generic-logstash',
|
||||
'fluentd',
|
||||
'telegraf',
|
||||
'prometheusbeat',
|
||||
'fluentbit',
|
||||
'nginx',
|
||||
'apache',
|
||||
'generic-logs',
|
||||
];
|
||||
|
||||
const TELEMETRY_PATTERNS_BY_NAME = DATA_DATASETS_INDEX_PATTERNS_UNIQUE.reduce((acc, pattern) => {
|
||||
acc[pattern.patternName] = [pattern, ...(acc[pattern.patternName] || [])];
|
||||
return acc;
|
||||
}, {} as Record<string, DatasetIndexPattern[]>);
|
||||
|
||||
export const LOGS_DATASET_INDEX_PATTERNS = LOGS_INDEX_PATTERN_NAMES.flatMap<DatasetIndexPattern>(
|
||||
(patternName) => TELEMETRY_PATTERNS_BY_NAME[patternName] || []
|
||||
);
|
||||
|
||||
export const LEVEL_2_RESOURCE_FIELDS = [
|
||||
'host.name',
|
||||
'service.name',
|
||||
'host',
|
||||
'hostname',
|
||||
'host_name',
|
||||
];
|
||||
|
||||
export const PROMINENT_LOG_ECS_FIELDS = [
|
||||
'log.level',
|
||||
'log.logger',
|
||||
'log.origin.file.name',
|
||||
'log.origin.function',
|
||||
'log.origin.file.line',
|
||||
'event.action',
|
||||
'event.category',
|
||||
'event.dataset',
|
||||
'event.kind',
|
||||
'log.file.path',
|
||||
];
|
||||
|
||||
export const DATA_TELEMETRY_FIELDS = [
|
||||
'container.id',
|
||||
'log.level',
|
||||
'container.name',
|
||||
'host.name',
|
||||
'host.hostname',
|
||||
'kubernetes.pod.name',
|
||||
'kubernetes.pod.uid',
|
||||
'cloud.provider',
|
||||
'agent.type',
|
||||
'event.dataset',
|
||||
'event.category',
|
||||
'event.module',
|
||||
'service.name',
|
||||
'service.type',
|
||||
'service.version',
|
||||
'message',
|
||||
'event.original',
|
||||
'error.message',
|
||||
'@timestamp',
|
||||
'data_stream.dataset',
|
||||
'data_stream.namespace',
|
||||
'data_stream.type',
|
||||
];
|
|
@ -0,0 +1,787 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { ElasticsearchClient, type Logger } from '@kbn/core/server';
|
||||
import type { AnalyticsServiceSetup } from '@kbn/core/public';
|
||||
import { TelemetryPluginStart } from '@kbn/telemetry-plugin/server';
|
||||
import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks';
|
||||
|
||||
import { DataTelemetryEvent } from './types';
|
||||
import { BREATHE_DELAY_MEDIUM, MAX_STREAMS_TO_REPORT } from './constants';
|
||||
import { DataTelemetryService } from './data_telemetry_service';
|
||||
|
||||
// Mock the constants module to speed up and simplify the tests
|
||||
jest.mock('./constants', () => ({
|
||||
...jest.requireActual('./constants'),
|
||||
BREATHE_DELAY_SHORT: 10,
|
||||
BREATHE_DELAY_MEDIUM: 50,
|
||||
BREATHE_DELAY_LONG: 100,
|
||||
|
||||
MAX_STREAMS_TO_REPORT: 50,
|
||||
|
||||
LOGS_DATASET_INDEX_PATTERNS: [
|
||||
{
|
||||
pattern: 'test-pattern-*',
|
||||
patternName: 'test',
|
||||
shipper: 'custom',
|
||||
},
|
||||
{
|
||||
pattern: 'test-pattern-2-*',
|
||||
patternName: 'test-2',
|
||||
shipper: 'custom-2',
|
||||
},
|
||||
],
|
||||
}));
|
||||
|
||||
const TEST_TIMEOUT = 60 * 1000;
|
||||
const SYNTH_DOCS = 6000000;
|
||||
|
||||
describe('DataTelemetryService', () => {
|
||||
let service: DataTelemetryService;
|
||||
let mockEsClient: jest.Mocked<ElasticsearchClient>;
|
||||
let mockAnalyticsSetup: jest.Mocked<AnalyticsServiceSetup>;
|
||||
let mockTelemetryStart: jest.Mocked<TelemetryPluginStart>;
|
||||
let mockLogger: jest.Mocked<Logger>;
|
||||
let mockTaskManagerSetup: ReturnType<typeof taskManagerMock.createSetup>;
|
||||
let mockTaskManagerStart: ReturnType<typeof taskManagerMock.createStart>;
|
||||
let runTask: ReturnType<typeof setupMocks>['runTask'];
|
||||
|
||||
describe('Data Telemetry Task', () => {
|
||||
beforeEach(async () => {
|
||||
const mocks = setupMocks();
|
||||
mockEsClient = mocks.mockEsClient;
|
||||
mockLogger = mocks.mockLogger;
|
||||
mockAnalyticsSetup = mocks.mockAnalyticsSetup;
|
||||
mockTelemetryStart = mocks.mockTelemetryStart;
|
||||
mockTaskManagerSetup = mocks.taskManagerSetup;
|
||||
mockTaskManagerStart = mocks.taskManagerStart;
|
||||
runTask = mocks.runTask;
|
||||
|
||||
service = new DataTelemetryService(mockLogger);
|
||||
service.setup(mockAnalyticsSetup, mockTaskManagerSetup);
|
||||
await service.start(
|
||||
mockTelemetryStart,
|
||||
{
|
||||
elasticsearch: { client: { asInternalUser: mockEsClient } },
|
||||
} as any,
|
||||
mockTaskManagerStart
|
||||
);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.clearAllTimers();
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
it('should trigger task runner run method', async () => {
|
||||
jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true);
|
||||
const collectAndSendSpy = jest.spyOn(service as any, 'collectAndSend');
|
||||
|
||||
await runTask();
|
||||
|
||||
// Assert collectAndSend is called
|
||||
expect(collectAndSendSpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Docs Info', () => {
|
||||
beforeEach(async () => {
|
||||
const mocks = setupMocks();
|
||||
mockEsClient = mocks.mockEsClient;
|
||||
mockLogger = mocks.mockLogger;
|
||||
mockAnalyticsSetup = mocks.mockAnalyticsSetup;
|
||||
mockTelemetryStart = mocks.mockTelemetryStart;
|
||||
mockTaskManagerSetup = mocks.taskManagerSetup;
|
||||
mockTaskManagerStart = mocks.taskManagerStart;
|
||||
runTask = mocks.runTask;
|
||||
|
||||
service = new DataTelemetryService(mockLogger);
|
||||
service.setup(mockAnalyticsSetup, mockTaskManagerSetup);
|
||||
await service.start(
|
||||
mockTelemetryStart,
|
||||
{
|
||||
elasticsearch: { client: { asInternalUser: mockEsClient } },
|
||||
} as any,
|
||||
mockTaskManagerStart
|
||||
);
|
||||
|
||||
jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.clearAllTimers();
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
it(
|
||||
'should collect and send telemetry after startup and every interval',
|
||||
async () => {
|
||||
const collectAndSendSpy = jest.spyOn(service as any, 'collectAndSend');
|
||||
|
||||
await runTask();
|
||||
expect(collectAndSendSpy).toHaveBeenCalledTimes(1);
|
||||
|
||||
await sleepForBreathDelay();
|
||||
expect(mockEsClient.indices.getMapping).toHaveBeenCalledTimes(1);
|
||||
|
||||
await runTask();
|
||||
expect(collectAndSendSpy).toHaveBeenCalledTimes(2);
|
||||
|
||||
await sleepForBreathDelay();
|
||||
expect(mockEsClient.indices.getMapping).toHaveBeenCalledTimes(2);
|
||||
},
|
||||
TEST_TIMEOUT
|
||||
);
|
||||
|
||||
it(
|
||||
'should stop collecting and sending telemetry if stopped',
|
||||
async () => {
|
||||
const collectAndSendSpy = jest.spyOn(service as any, 'collectAndSend');
|
||||
|
||||
await runTask();
|
||||
expect(collectAndSendSpy).toHaveBeenCalledTimes(1);
|
||||
|
||||
service.stop();
|
||||
|
||||
await runTask();
|
||||
await sleepForBreathDelay();
|
||||
expect(collectAndSendSpy).toHaveBeenCalledTimes(1);
|
||||
},
|
||||
TEST_TIMEOUT
|
||||
);
|
||||
|
||||
it(
|
||||
'should not collect data if telemetry is not opted in',
|
||||
async () => {
|
||||
jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(false);
|
||||
|
||||
const collectAndSendSpy = jest.spyOn(service as any, 'collectAndSend');
|
||||
|
||||
await runTask();
|
||||
expect(collectAndSendSpy).not.toHaveBeenCalled();
|
||||
|
||||
await runTask();
|
||||
await sleepForBreathDelay();
|
||||
expect(collectAndSendSpy).not.toHaveBeenCalled();
|
||||
|
||||
// Assert that logger.debug is called with appropriate message
|
||||
expect(mockLogger.debug).toHaveBeenCalledWith(
|
||||
'[Logs Data Telemetry] Telemetry is not opted-in.'
|
||||
);
|
||||
},
|
||||
TEST_TIMEOUT
|
||||
);
|
||||
|
||||
it(
|
||||
'should not collect if number of data streams exceed MAX_STREAMS_TO_REPORT',
|
||||
async () => {
|
||||
(mockEsClient.indices.getDataStream as unknown as jest.Mock).mockResolvedValue({
|
||||
data_streams: Array.from({ length: MAX_STREAMS_TO_REPORT + 1 }, (_, i) => ({
|
||||
name: `logs-postgresql.log-default-${i}`,
|
||||
indices: [
|
||||
{
|
||||
index_name: `.ds-logs-postgresql.log-default-${i}-000001`,
|
||||
},
|
||||
],
|
||||
_meta: {
|
||||
managed: true,
|
||||
description: 'default logs template installed by x-pack',
|
||||
},
|
||||
})),
|
||||
});
|
||||
|
||||
await runTask();
|
||||
await sleepForBreathDelay();
|
||||
expect(mockEsClient.indices.getMapping).not.toHaveBeenCalled();
|
||||
},
|
||||
TEST_TIMEOUT
|
||||
);
|
||||
|
||||
it(
|
||||
'creates and sends the telemetry events',
|
||||
async () => {
|
||||
jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true);
|
||||
|
||||
const reportEventsSpy = jest.spyOn(service as any, 'reportEvents');
|
||||
|
||||
await runTask();
|
||||
await sleepForBreathDelay();
|
||||
|
||||
expect(reportEventsSpy).toHaveBeenCalledTimes(1);
|
||||
expect(
|
||||
(
|
||||
reportEventsSpy.mock?.lastCall as [
|
||||
[Partial<DataTelemetryEvent>],
|
||||
[Partial<DataTelemetryEvent>]
|
||||
]
|
||||
)?.[0]?.[0]
|
||||
).toEqual(
|
||||
expect.objectContaining({
|
||||
doc_count: 4000 + 500 + 200,
|
||||
failure_store_doc_count: 300,
|
||||
index_count: 2 + 1 + 1,
|
||||
failure_store_index_count: 1,
|
||||
namespace_count: 1 + 1,
|
||||
size_in_bytes: 10089898 + 800000 + 500000,
|
||||
pattern_name: 'test',
|
||||
managed_by: ['fleet'],
|
||||
package_name: ['activemq'],
|
||||
beat: [],
|
||||
})
|
||||
);
|
||||
},
|
||||
TEST_TIMEOUT
|
||||
);
|
||||
|
||||
it(
|
||||
'should not include stats of excluded indices',
|
||||
async () => {
|
||||
jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true);
|
||||
const reportEventsSpy = jest.spyOn(service as any, 'reportEvents');
|
||||
await runTask();
|
||||
await sleepForBreathDelay();
|
||||
|
||||
expect(reportEventsSpy).toHaveBeenCalledTimes(1);
|
||||
const events = reportEventsSpy.mock?.lastCall as [
|
||||
[Partial<DataTelemetryEvent>],
|
||||
[Partial<DataTelemetryEvent>]
|
||||
];
|
||||
// doc_count should be less than SYNTH_DOCS for any event
|
||||
(events[0] ?? []).forEach((event) => {
|
||||
expect(event.doc_count).toBeLessThan(SYNTH_DOCS);
|
||||
});
|
||||
},
|
||||
TEST_TIMEOUT
|
||||
);
|
||||
});
|
||||
|
||||
describe('Fields Info and Structure Levels', () => {
|
||||
beforeEach(async () => {
|
||||
jest.mock('./constants', () => ({
|
||||
...jest.requireActual('./constants'),
|
||||
LOGS_DATASET_INDEX_PATTERNS: [
|
||||
{
|
||||
pattern: 'test-pattern-*',
|
||||
patternName: 'test',
|
||||
shipper: 'custom',
|
||||
},
|
||||
{
|
||||
pattern: 'test-pattern-3-*',
|
||||
patternName: 'test-3',
|
||||
shipper: 'custom-3',
|
||||
},
|
||||
],
|
||||
}));
|
||||
|
||||
const mocks = setupMocks();
|
||||
mockEsClient = mocks.mockEsClient;
|
||||
mockLogger = mocks.mockLogger;
|
||||
mockAnalyticsSetup = mocks.mockAnalyticsSetup;
|
||||
mockTelemetryStart = mocks.mockTelemetryStart;
|
||||
mockTaskManagerSetup = mocks.taskManagerSetup;
|
||||
mockTaskManagerStart = mocks.taskManagerStart;
|
||||
runTask = mocks.runTask;
|
||||
|
||||
service = new DataTelemetryService(mockLogger);
|
||||
service.setup(mockAnalyticsSetup, mockTaskManagerSetup);
|
||||
await service.start(
|
||||
mockTelemetryStart,
|
||||
{
|
||||
elasticsearch: { client: { asInternalUser: mockEsClient } },
|
||||
} as any,
|
||||
mockTaskManagerStart
|
||||
);
|
||||
|
||||
jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.clearAllTimers();
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
it(
|
||||
'should correctly calculate total fields and count of resource fields',
|
||||
async () => {
|
||||
jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true);
|
||||
|
||||
const reportEventsSpy = jest.spyOn(service as any, 'reportEvents');
|
||||
|
||||
await runTask();
|
||||
await sleepForBreathDelay();
|
||||
|
||||
expect(reportEventsSpy).toHaveBeenCalledTimes(1);
|
||||
const lastCall = reportEventsSpy.mock?.lastCall?.[0] as [Partial<DataTelemetryEvent>];
|
||||
expect(lastCall?.[0]?.field_count).toBe(8);
|
||||
expect(lastCall?.[0]?.field_existence).toEqual({
|
||||
'container.id': 3000 + 500,
|
||||
'host.name': 3000 + 500,
|
||||
message: 3000,
|
||||
'@timestamp': 3000 + 500 + 200,
|
||||
'data_stream.dataset': 3000 + 500 + 200,
|
||||
'data_stream.namespace': 3000 + 500 + 200,
|
||||
'data_stream.type': 3000 + 500 + 200,
|
||||
});
|
||||
},
|
||||
TEST_TIMEOUT
|
||||
);
|
||||
|
||||
it('should correctly calculate structure levels', async () => {
|
||||
jest.spyOn(service as any, 'shouldCollectTelemetry').mockResolvedValue(true);
|
||||
|
||||
const reportEventsSpy = jest.spyOn(service as any, 'reportEvents');
|
||||
|
||||
await runTask();
|
||||
await sleepForBreathDelay();
|
||||
|
||||
expect(reportEventsSpy).toHaveBeenCalledTimes(1);
|
||||
const lastCall = reportEventsSpy.mock?.lastCall?.[0] as [
|
||||
Partial<DataTelemetryEvent>,
|
||||
Partial<DataTelemetryEvent>
|
||||
];
|
||||
expect(lastCall?.[1]?.structure_level).toEqual({
|
||||
'1': 1000,
|
||||
'4': 500,
|
||||
'6': 200,
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
function sleepForBreathDelay() {
|
||||
return new Promise((resolve) => setTimeout(resolve, BREATHE_DELAY_MEDIUM * 10));
|
||||
}
|
||||
|
||||
function setupMocks() {
|
||||
const mockEsClient = {
|
||||
indices: {
|
||||
stats: jest.fn().mockImplementation(() => {
|
||||
const emptyAllStats: any = { _all: {} };
|
||||
|
||||
// _all shouldn't be read hence overriding with empty here
|
||||
return Promise.resolve({
|
||||
...emptyAllStats,
|
||||
indices: {
|
||||
...emptyAllStats.indices,
|
||||
...MOCK_POSTGRES_DEFAULT_STATS.indices,
|
||||
...MOCK_POSTGRES_NON_DEFAULT_STATS.indices,
|
||||
...MOCK_ACTIVE_MQ_DEFAULT_STATS.indices,
|
||||
...MOCK_FLUENT_BIT_DEFAULT_STATS.indices,
|
||||
...MOCK_SYNTH_DATA_STATS.indices,
|
||||
},
|
||||
});
|
||||
}),
|
||||
getDataStream: jest.fn().mockImplementation((params) => {
|
||||
if (params.name === 'test-pattern-2-*') {
|
||||
return Promise.resolve({
|
||||
data_streams: MOCK_FLUENT_BIT_DATA_STREAMS,
|
||||
});
|
||||
}
|
||||
|
||||
return Promise.resolve({
|
||||
data_streams: [
|
||||
...MOCK_POSTGRES_DATA_STREAMS,
|
||||
...MOCK_SYNTH_DATA_STREAMS,
|
||||
...MOCK_ACTIVE_MQ_FLEET_DATA_STREAMS,
|
||||
],
|
||||
});
|
||||
}),
|
||||
get: jest.fn().mockResolvedValue(MOCK_INDICES),
|
||||
getMapping: jest.fn().mockImplementation(() => {
|
||||
return Promise.resolve({
|
||||
...MOCK_APACHE_GENERIC_INDEX_MAPPING,
|
||||
...MOCK_POSTGRES_DEFAULT_MAPPINGS,
|
||||
...MOCK_POSTGRES_NON_DEFAULT_MAPPINGS,
|
||||
...MOCK_ACTIVE_MQ_DEFAULT_MAPPINGS,
|
||||
...MOCK_FLUENT_BIT_DEFAULT_MAPPINGS,
|
||||
});
|
||||
}),
|
||||
},
|
||||
info: jest.fn().mockResolvedValue({}),
|
||||
transport: {
|
||||
request: jest.fn().mockImplementation((params) => {
|
||||
if (params.path?.includes('_stats') && params?.querystring?.failure_store === 'only') {
|
||||
return MOCK_ACTIVE_MQ_FAILURE_STATS;
|
||||
}
|
||||
|
||||
return MOCK_ACTIVE_MQ_FAILURE_STATS;
|
||||
}),
|
||||
},
|
||||
} as unknown as jest.Mocked<ElasticsearchClient>;
|
||||
|
||||
const mockLogger = {
|
||||
debug: jest.fn(),
|
||||
warn: jest.fn(),
|
||||
error: jest.fn(),
|
||||
} as unknown as jest.Mocked<Logger>;
|
||||
|
||||
const mockAnalyticsSetup = {
|
||||
getTelemetryUrl: jest.fn().mockResolvedValue(new URL('https://telemetry.elastic.co')),
|
||||
} as unknown as jest.Mocked<AnalyticsServiceSetup>;
|
||||
|
||||
const mockTelemetryStart = {
|
||||
getIsOptedIn: jest.fn().mockResolvedValue(true),
|
||||
} as unknown as jest.Mocked<TelemetryPluginStart>;
|
||||
|
||||
const taskManagerSetup = taskManagerMock.createSetup();
|
||||
const taskManagerStart = taskManagerMock.createStart();
|
||||
|
||||
const runTask = () => {
|
||||
const taskDefinitions = taskManagerSetup.registerTaskDefinitions.mock.calls[0][0];
|
||||
const taskType = Object.keys(taskDefinitions)[0];
|
||||
const taskRunner = taskDefinitions[taskType].createTaskRunner({ taskInstance: {} as any });
|
||||
|
||||
return taskRunner.run();
|
||||
};
|
||||
|
||||
return {
|
||||
mockEsClient,
|
||||
mockLogger,
|
||||
mockAnalyticsSetup,
|
||||
mockTelemetryStart,
|
||||
taskManagerSetup,
|
||||
taskManagerStart,
|
||||
runTask,
|
||||
};
|
||||
}
|
||||
|
||||
const MOCK_INDICES = {
|
||||
'apache-generic-index': {},
|
||||
'.ds-logs-postgresql.log-default-2024.07.31-000001': {},
|
||||
'.ds-logs-postgresql.log-default-2024.08.31-000002': {},
|
||||
'.ds-logs-synth.01-default-2024.07.31-000001': {},
|
||||
'.ds-logs-active-mq.fleet-2024.07.31-000001': {},
|
||||
};
|
||||
|
||||
const MOCK_POSTGRES_DATA_STREAMS = [
|
||||
{
|
||||
name: 'logs-postgresql.log-default',
|
||||
indices: [
|
||||
{
|
||||
index_name: '.ds-logs-postgresql.log-default-2024.07.31-000001',
|
||||
},
|
||||
{
|
||||
index_name: '.ds-logs-postgresql.log-default-2024.08.31-000002',
|
||||
},
|
||||
],
|
||||
_meta: {
|
||||
managed: true,
|
||||
description: 'default logs template installed by x-pack',
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'logs-postgresql.log-non-default',
|
||||
indices: [
|
||||
{
|
||||
index_name: '.ds-logs-postgresql.log-non-default-2024.07.31-000001',
|
||||
},
|
||||
],
|
||||
_meta: {
|
||||
managed: true,
|
||||
description: 'default logs template installed by x-pack',
|
||||
},
|
||||
},
|
||||
];
|
||||
const MOCK_POSTGRES_DEFAULT_STATS = {
|
||||
_all: {
|
||||
...getPrimaryDocsAndStoreSize(1000 + 3000, 1000000 + 9089898),
|
||||
},
|
||||
indices: {
|
||||
'.ds-logs-postgresql.log-default-2024.07.31-000001': getPrimaryDocsAndStoreSize(1000, 1000000),
|
||||
'.ds-logs-postgresql.log-default-2024.08.31-000002': getPrimaryDocsAndStoreSize(3000, 9089898),
|
||||
},
|
||||
};
|
||||
|
||||
const MOCK_POSTGRES_NON_DEFAULT_STATS = {
|
||||
_all: {},
|
||||
indices: {
|
||||
'.ds-logs-postgresql.log-non-default-2024.07.31-000001': getPrimaryDocsAndStoreSize(
|
||||
500,
|
||||
800000
|
||||
),
|
||||
},
|
||||
};
|
||||
|
||||
const MOCK_POSTGRES_DEFAULT_MAPPINGS = {
|
||||
'.ds-logs-postgresql.log-default-2024.08.31-000002': {
|
||||
mappings: {
|
||||
properties: {
|
||||
...getTimestampProp(),
|
||||
...getContainerProp(),
|
||||
...getDataStreamProps('postgresql.log', 'default', 'logs'),
|
||||
...getHostProp(),
|
||||
...getMessageProp(),
|
||||
custom_field_01: {
|
||||
type: 'float',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const MOCK_POSTGRES_NON_DEFAULT_MAPPINGS = {
|
||||
'.ds-logs-postgresql.log-non-default-2024.07.31-000001': {
|
||||
mappings: {
|
||||
properties: {
|
||||
...getTimestampProp(),
|
||||
...getContainerProp(),
|
||||
...getDataStreamProps('postgresql.log', 'non-default', 'logs'),
|
||||
...getHostProp(),
|
||||
custom_field_01: {
|
||||
type: 'float',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const MOCK_SYNTH_DATA_STREAMS = [
|
||||
{
|
||||
name: 'logs-synth.01-default',
|
||||
indices: [
|
||||
{
|
||||
index_name: '.ds-logs-synth.01-default-2024.07.31-000001',
|
||||
},
|
||||
{
|
||||
index_name: '.ds-logs-synth.01-default-2024.08.31-000002',
|
||||
},
|
||||
],
|
||||
_meta: {
|
||||
managed: true,
|
||||
description: 'default logs template installed by x-pack',
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
// Docs from synth data shouldn't be counted in the telemetry events
|
||||
const MOCK_SYNTH_DATA_STATS = {
|
||||
_all: {},
|
||||
indices: {
|
||||
'.ds-logs-synth.01-default-2024.07.31-000001': getPrimaryDocsAndStoreSize(
|
||||
SYNTH_DOCS,
|
||||
1000000000
|
||||
),
|
||||
},
|
||||
};
|
||||
|
||||
const MOCK_APACHE_GENERIC_INDEX_MAPPING = {
|
||||
'apache-generic-index': {
|
||||
mappings: {
|
||||
properties: {
|
||||
...getTimestampProp(),
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const MOCK_ACTIVE_MQ_FLEET_DATA_STREAMS = [
|
||||
{
|
||||
name: 'logs-active-mq.fleet',
|
||||
indices: [
|
||||
{
|
||||
index_name: '.ds-logs-active-mq.fleet-2024.07.31-000001',
|
||||
},
|
||||
],
|
||||
_meta: {
|
||||
package: {
|
||||
name: 'activemq',
|
||||
},
|
||||
managed_by: 'fleet',
|
||||
managed: true,
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
const MOCK_ACTIVE_MQ_DEFAULT_STATS = {
|
||||
_all: {},
|
||||
indices: {
|
||||
'.ds-logs-active-mq.fleet-2024.07.31-000001': getPrimaryDocsAndStoreSize(200, 500000),
|
||||
},
|
||||
};
|
||||
|
||||
const MOCK_ACTIVE_MQ_FAILURE_STATS = {
|
||||
_all: {},
|
||||
indices: {
|
||||
'.fs-logs-active-mq.fleet-2024.07.31-000001': getPrimaryDocsAndStoreSize(300, 700000),
|
||||
},
|
||||
};
|
||||
|
||||
const MOCK_ACTIVE_MQ_DEFAULT_MAPPINGS = {
|
||||
'.ds-logs-active-mq.fleet-2024.07.31-000001': {
|
||||
mappings: {
|
||||
properties: {
|
||||
...getTimestampProp(),
|
||||
...getDataStreamProps('active-mq.fleet', 'default', 'logs'),
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const MOCK_FLUENT_BIT_DATA_STREAMS = [
|
||||
{
|
||||
name: 'logs-fluent-bit.fleet',
|
||||
indices: [
|
||||
{
|
||||
index_name: '.ds-logs-fluent-bit.fleet-2024.07.31-000001',
|
||||
},
|
||||
{
|
||||
index_name: '.ds-logs-fluent-bit.fleet-2024.07.31-000002',
|
||||
},
|
||||
{
|
||||
index_name: '.ds-logs-fluent-bit.fleet-2024.07.31-000003',
|
||||
},
|
||||
],
|
||||
_meta: {
|
||||
managed: true,
|
||||
description: 'default logs template installed by x-pack',
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
const MOCK_FLUENT_BIT_DEFAULT_MAPPINGS = {
|
||||
'.ds-logs-fluent-bit.fleet-2024.07.31-000001': {
|
||||
// Level 01
|
||||
mappings: {
|
||||
properties: {
|
||||
...getTimestampProp(),
|
||||
...getDataStreamProps('fluent-bit.fleet', 'default', 'logs'),
|
||||
...getEcsVersionProp(),
|
||||
},
|
||||
},
|
||||
},
|
||||
'.ds-logs-fluent-bit.fleet-2024.07.31-000002': {
|
||||
// Level 04
|
||||
mappings: {
|
||||
properties: {
|
||||
...getTimestampProp(),
|
||||
...getHostProp(),
|
||||
...getMessageProp(),
|
||||
},
|
||||
},
|
||||
},
|
||||
'.ds-logs-fluent-bit.fleet-2024.07.31-000003': {
|
||||
// Level 06
|
||||
mappings: {
|
||||
properties: {
|
||||
...getTimestampProp(),
|
||||
...getHostProp(),
|
||||
...getMessageProp(),
|
||||
...getEcsVersionProp(),
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const MOCK_FLUENT_BIT_DEFAULT_STATS = {
|
||||
_all: {},
|
||||
indices: {
|
||||
'.ds-logs-fluent-bit.fleet-2024.07.31-000001': getPrimaryDocsAndStoreSize(1000, 1000000),
|
||||
'.ds-logs-fluent-bit.fleet-2024.07.31-000002': getPrimaryDocsAndStoreSize(500, 800000),
|
||||
'.ds-logs-fluent-bit.fleet-2024.07.31-000003': getPrimaryDocsAndStoreSize(200, 500000),
|
||||
},
|
||||
};
|
||||
|
||||
function getPrimaryDocsAndStoreSize(docs: number, storeSize: number) {
|
||||
return {
|
||||
primaries: {
|
||||
docs: {
|
||||
count: docs,
|
||||
deleted: 0,
|
||||
total_size_in_bytes: storeSize,
|
||||
},
|
||||
store: {
|
||||
size_in_bytes: storeSize,
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function getTimestampProp() {
|
||||
return {
|
||||
'@timestamp': {
|
||||
type: 'date',
|
||||
ignore_malformed: false,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function getDataStreamProps(dataset: string, namespace: string, type: string) {
|
||||
return {
|
||||
data_stream: {
|
||||
properties: {
|
||||
dataset: {
|
||||
type: 'constant_keyword',
|
||||
value: dataset,
|
||||
},
|
||||
namespace: {
|
||||
type: 'constant_keyword',
|
||||
value: namespace,
|
||||
},
|
||||
type: {
|
||||
type: 'constant_keyword',
|
||||
value: type,
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function getContainerProp() {
|
||||
return {
|
||||
container: {
|
||||
properties: {
|
||||
id: {
|
||||
type: 'text',
|
||||
fields: {
|
||||
keyword: {
|
||||
type: 'keyword',
|
||||
ignore_above: 256,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function getHostProp() {
|
||||
return {
|
||||
host: {
|
||||
properties: {
|
||||
name: {
|
||||
type: 'text',
|
||||
fields: {
|
||||
keyword: {
|
||||
type: 'keyword',
|
||||
ignore_above: 256,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function getEcsVersionProp() {
|
||||
return {
|
||||
ecs: {
|
||||
properties: {
|
||||
version: {
|
||||
type: 'keyword',
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function getMessageProp() {
|
||||
return {
|
||||
message: {
|
||||
type: 'text',
|
||||
},
|
||||
};
|
||||
}
|
|
@ -0,0 +1,261 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import {
|
||||
from,
|
||||
defer,
|
||||
delay,
|
||||
filter,
|
||||
tap,
|
||||
take,
|
||||
takeWhile,
|
||||
exhaustMap,
|
||||
switchMap,
|
||||
map,
|
||||
of,
|
||||
EMPTY,
|
||||
} from 'rxjs';
|
||||
import type { CoreStart, ElasticsearchClient, Logger } from '@kbn/core/server';
|
||||
import type { AnalyticsServiceSetup } from '@kbn/core/public';
|
||||
import {
|
||||
TaskInstance,
|
||||
TaskManagerSetupContract,
|
||||
TaskManagerStartContract,
|
||||
} from '@kbn/task-manager-plugin/server';
|
||||
import type { TelemetryPluginStart } from '@kbn/telemetry-plugin/server';
|
||||
|
||||
import {
|
||||
BREATHE_DELAY_MEDIUM,
|
||||
BREATHE_DELAY_SHORT,
|
||||
NON_LOG_SIGNALS,
|
||||
EXCLUDE_ELASTIC_LOGS,
|
||||
MAX_STREAMS_TO_REPORT,
|
||||
LOGS_DATASET_INDEX_PATTERNS,
|
||||
LOGS_DATA_TELEMETRY_TASK_TYPE,
|
||||
TELEMETRY_TASK_INTERVAL,
|
||||
LOGS_DATA_TELEMETRY_TASK_ID,
|
||||
TELEMETRY_TASK_TIMEOUT,
|
||||
} from './constants';
|
||||
import {
|
||||
getAllIndices,
|
||||
addMappingsToIndices,
|
||||
addNamespace,
|
||||
groupStatsByPatternName,
|
||||
getIndexBasicStats,
|
||||
indexStatsToTelemetryEvents,
|
||||
getIndexFieldStats,
|
||||
} from './helpers';
|
||||
|
||||
import { DataTelemetryEvent } from './types';
|
||||
|
||||
export class DataTelemetryService {
|
||||
private readonly logger: Logger;
|
||||
private isStopped = false;
|
||||
|
||||
private telemetryStart?: TelemetryPluginStart;
|
||||
|
||||
// @ts-ignore: Unused variable
|
||||
private analytics?: AnalyticsServiceSetup;
|
||||
|
||||
// @ts-ignore: Unused variable
|
||||
private isInProgress = false;
|
||||
|
||||
private isOptedIn?: boolean = true; // Assume true until the first check
|
||||
private esClient?: ElasticsearchClient;
|
||||
|
||||
private run$ = defer(() => from(this.shouldCollectTelemetry())).pipe(
|
||||
takeWhile(() => !this.isStopped),
|
||||
tap((isOptedIn) => {
|
||||
if (!isOptedIn) {
|
||||
this.logTelemetryNotOptedIn();
|
||||
this.isInProgress = false;
|
||||
} else {
|
||||
this.isInProgress = true;
|
||||
}
|
||||
}),
|
||||
filter((isOptedIn) => isOptedIn),
|
||||
exhaustMap(() => this.collectAndSend()),
|
||||
tap(() => (this.isInProgress = false))
|
||||
);
|
||||
|
||||
constructor(logger: Logger) {
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
public setup(analytics: AnalyticsServiceSetup, taskManager: TaskManagerSetupContract) {
|
||||
this.analytics = analytics;
|
||||
this.registerTask(taskManager);
|
||||
}
|
||||
|
||||
public async start(
|
||||
telemetryStart: TelemetryPluginStart,
|
||||
core: CoreStart,
|
||||
taskManager: TaskManagerStartContract
|
||||
) {
|
||||
this.telemetryStart = telemetryStart;
|
||||
this.esClient = core?.elasticsearch.client.asInternalUser;
|
||||
|
||||
if (taskManager) {
|
||||
const taskInstance = await this.scheduleTask(taskManager);
|
||||
if (taskInstance) {
|
||||
this.logger.debug(`Task ${taskInstance.id} scheduled.`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public stop() {
|
||||
this.isStopped = true;
|
||||
}
|
||||
|
||||
private registerTask(taskManager: TaskManagerSetupContract) {
|
||||
const service = this;
|
||||
taskManager.registerTaskDefinitions({
|
||||
[LOGS_DATA_TELEMETRY_TASK_TYPE]: {
|
||||
title: 'Logs Data Telemetry',
|
||||
description:
|
||||
'This task collects data telemetry for logs data and sends it to the telemetry service.',
|
||||
timeout: `${TELEMETRY_TASK_TIMEOUT}m`,
|
||||
maxAttempts: 1, // Do not retry
|
||||
|
||||
createTaskRunner: () => {
|
||||
return {
|
||||
// Perform the work of the task. The return value should fit the TaskResult interface.
|
||||
async run() {
|
||||
service.logger.debug(`[Logs Data Telemetry] Running task`);
|
||||
|
||||
try {
|
||||
service.run$.pipe(take(1)).subscribe({
|
||||
complete: () => {
|
||||
service.logger.debug(`[Logs Data Telemetry] Task completed`);
|
||||
},
|
||||
});
|
||||
} catch (e) {
|
||||
service.logger.error(e);
|
||||
}
|
||||
},
|
||||
async cancel() {
|
||||
service.logger.debug(`[Logs Data Telemetry] Task cancelled`);
|
||||
},
|
||||
};
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
private async scheduleTask(taskManager: TaskManagerStartContract): Promise<TaskInstance | null> {
|
||||
try {
|
||||
const taskInstance = await taskManager.ensureScheduled({
|
||||
id: LOGS_DATA_TELEMETRY_TASK_ID,
|
||||
taskType: LOGS_DATA_TELEMETRY_TASK_TYPE,
|
||||
schedule: {
|
||||
interval: `${TELEMETRY_TASK_INTERVAL}m`,
|
||||
},
|
||||
params: {},
|
||||
state: {},
|
||||
scope: ['logs'],
|
||||
});
|
||||
|
||||
this.logger?.debug(
|
||||
`Task ${LOGS_DATA_TELEMETRY_TASK_ID} scheduled with interval ${taskInstance.schedule?.interval}.`
|
||||
);
|
||||
|
||||
return taskInstance;
|
||||
} catch (e) {
|
||||
this.logger?.error(
|
||||
`Failed to schedule task ${LOGS_DATA_TELEMETRY_TASK_ID} with interval ${TELEMETRY_TASK_INTERVAL}. ${e?.message}`
|
||||
);
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public async shouldCollectTelemetry() {
|
||||
if (process.env.CI) {
|
||||
return false; // Telemetry collection flow should not run in CI
|
||||
}
|
||||
|
||||
this.isOptedIn = await this.telemetryStart?.getIsOptedIn();
|
||||
return this.isOptedIn === true;
|
||||
}
|
||||
|
||||
private collectAndSend() {
|
||||
// Gather data streams and indices related to each stream of log
|
||||
if (this.esClient) {
|
||||
return getAllIndices({
|
||||
esClient: this.esClient,
|
||||
logsIndexPatterns: LOGS_DATASET_INDEX_PATTERNS,
|
||||
excludeStreamsStartingWith: [...NON_LOG_SIGNALS, ...EXCLUDE_ELASTIC_LOGS],
|
||||
breatheDelay: BREATHE_DELAY_MEDIUM,
|
||||
}).pipe(
|
||||
switchMap((dataStreamsAndIndicesInfo) => {
|
||||
if (dataStreamsAndIndicesInfo.length > MAX_STREAMS_TO_REPORT) {
|
||||
this.logger.debug(
|
||||
`[Logs Data Telemetry] Number of data streams exceeds ${MAX_STREAMS_TO_REPORT}. Skipping telemetry collection.`
|
||||
);
|
||||
return EMPTY;
|
||||
}
|
||||
return of(dataStreamsAndIndicesInfo);
|
||||
}),
|
||||
delay(BREATHE_DELAY_MEDIUM),
|
||||
switchMap((dataStreamsAndIndicesInfo) => {
|
||||
return addMappingsToIndices({
|
||||
esClient: this.esClient!,
|
||||
dataStreamsInfo: dataStreamsAndIndicesInfo,
|
||||
logsIndexPatterns: LOGS_DATASET_INDEX_PATTERNS,
|
||||
});
|
||||
}),
|
||||
delay(BREATHE_DELAY_SHORT),
|
||||
switchMap((dataStreamsAndIndicesInfo) => {
|
||||
return addNamespace({
|
||||
dataStreamsInfo: dataStreamsAndIndicesInfo,
|
||||
});
|
||||
}),
|
||||
delay(BREATHE_DELAY_MEDIUM),
|
||||
switchMap((infoWithNamespace) => {
|
||||
return getIndexBasicStats({
|
||||
esClient: this.esClient!,
|
||||
indices: infoWithNamespace,
|
||||
breatheDelay: BREATHE_DELAY_MEDIUM,
|
||||
});
|
||||
}),
|
||||
delay(BREATHE_DELAY_SHORT),
|
||||
switchMap((infoWithStats) => {
|
||||
return getIndexFieldStats({
|
||||
basicStats: infoWithStats,
|
||||
});
|
||||
}),
|
||||
delay(BREATHE_DELAY_SHORT),
|
||||
map((statsWithNamespace) => {
|
||||
return groupStatsByPatternName(statsWithNamespace);
|
||||
}),
|
||||
map((statsByPattern) => {
|
||||
return indexStatsToTelemetryEvents(statsByPattern);
|
||||
}),
|
||||
delay(BREATHE_DELAY_SHORT),
|
||||
switchMap((dataTelemetryEvents) => {
|
||||
return from(this.reportEvents(dataTelemetryEvents));
|
||||
})
|
||||
);
|
||||
} else {
|
||||
this.logger.warn(
|
||||
`[Logs Data Telemetry] Elasticsearch client is unavailable: cannot retrieve data streams
|
||||
for stream of logs`
|
||||
);
|
||||
|
||||
return EMPTY;
|
||||
}
|
||||
}
|
||||
|
||||
private async reportEvents(events: DataTelemetryEvent[]) {
|
||||
// TODO: Implement reporting events via analytics service
|
||||
return Promise.resolve(events);
|
||||
}
|
||||
|
||||
private logTelemetryNotOptedIn() {
|
||||
this.logger.debug(`[Logs Data Telemetry] Telemetry is not opted-in.`);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,575 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { intersection } from 'lodash';
|
||||
import { from, of, Observable, concatMap, delay, map, toArray, forkJoin } from 'rxjs';
|
||||
import {
|
||||
MappingPropertyBase,
|
||||
IndicesGetMappingResponse,
|
||||
IndicesStatsResponse,
|
||||
} from '@elastic/elasticsearch/lib/api/types';
|
||||
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
||||
import { DataStreamFieldStatsPerNamespace, DatasetIndexPattern } from './types';
|
||||
|
||||
import {
|
||||
IndexBasicInfo,
|
||||
DataStreamStatsPerNamespace,
|
||||
DataStreamStats,
|
||||
DataTelemetryEvent,
|
||||
} from './types';
|
||||
import {
|
||||
DATA_TELEMETRY_FIELDS,
|
||||
LEVEL_2_RESOURCE_FIELDS,
|
||||
PROMINENT_LOG_ECS_FIELDS,
|
||||
} from './constants';
|
||||
|
||||
/**
|
||||
* Retrieves all indices and data streams for each stream of logs.
|
||||
*/
|
||||
export function getAllIndices({
|
||||
esClient,
|
||||
logsIndexPatterns,
|
||||
excludeStreamsStartingWith,
|
||||
breatheDelay,
|
||||
}: {
|
||||
esClient: ElasticsearchClient;
|
||||
logsIndexPatterns: DatasetIndexPattern[];
|
||||
excludeStreamsStartingWith: string[];
|
||||
breatheDelay: number; // Breathing time between each request to prioritize other cluster operations
|
||||
}): Observable<IndexBasicInfo[]> {
|
||||
const uniqueIndices = new Set<string>();
|
||||
const indicesInfo: IndexBasicInfo[] = [];
|
||||
|
||||
return from(logsIndexPatterns).pipe(
|
||||
concatMap((pattern) =>
|
||||
of(pattern).pipe(
|
||||
delay(breatheDelay),
|
||||
concatMap(() => {
|
||||
return forkJoin([
|
||||
from(getDataStreamsInfoForPattern({ esClient, pattern })),
|
||||
from(getIndicesInfoForPattern({ esClient, pattern })),
|
||||
]);
|
||||
}),
|
||||
map(([patternDataStreamsInfo, patternIndicesInfo]) => {
|
||||
return [...patternDataStreamsInfo, ...patternIndicesInfo];
|
||||
}),
|
||||
map((indicesAndDataStreams) => {
|
||||
// Exclude indices that have already been dealt with
|
||||
return indicesAndDataStreams.filter((dataStream) => {
|
||||
return !uniqueIndices.has(dataStream.name);
|
||||
});
|
||||
}),
|
||||
map((indicesAndDataStreams) => {
|
||||
// Exclude internal or backing indices
|
||||
return indicesAndDataStreams.filter((dataStream) => !dataStream.name.startsWith('.'));
|
||||
}),
|
||||
map((indicesAndDataStreams) => {
|
||||
return indicesAndDataStreams.filter(
|
||||
// Exclude streams starting with non log known signals
|
||||
(dataStream) =>
|
||||
!excludeStreamsStartingWith.some((excludeStream) =>
|
||||
dataStream.name.startsWith(excludeStream)
|
||||
)
|
||||
);
|
||||
}),
|
||||
map((indicesAndDataStreams) => {
|
||||
indicesAndDataStreams.forEach((dataStream) => {
|
||||
uniqueIndices.add(dataStream.name);
|
||||
});
|
||||
return indicesAndDataStreams;
|
||||
}),
|
||||
map((dataStreamsInfoRecords) => {
|
||||
indicesInfo.push(...dataStreamsInfoRecords);
|
||||
return dataStreamsInfoRecords;
|
||||
})
|
||||
)
|
||||
),
|
||||
toArray(),
|
||||
map(() => indicesInfo)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the mappings at once and adds to the indices info.
|
||||
*/
|
||||
export function addMappingsToIndices({
|
||||
esClient,
|
||||
dataStreamsInfo,
|
||||
logsIndexPatterns,
|
||||
}: {
|
||||
esClient: ElasticsearchClient;
|
||||
dataStreamsInfo: IndexBasicInfo[];
|
||||
logsIndexPatterns: DatasetIndexPattern[];
|
||||
}): Observable<IndexBasicInfo[]> {
|
||||
return from(
|
||||
esClient.indices.getMapping({
|
||||
index: logsIndexPatterns.map((pattern) => pattern.pattern),
|
||||
})
|
||||
).pipe(
|
||||
map((mappings) => {
|
||||
return dataStreamsInfo.map((info) => {
|
||||
// Add mapping for each index
|
||||
info.indices.forEach((index) => {
|
||||
if (mappings[index]) {
|
||||
info.mapping = { ...(info.mapping ?? {}), [index]: mappings[index] };
|
||||
}
|
||||
});
|
||||
|
||||
return info;
|
||||
});
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the namespace of the index from index mapping if available.
|
||||
*/
|
||||
export function addNamespace({
|
||||
dataStreamsInfo,
|
||||
}: {
|
||||
dataStreamsInfo: IndexBasicInfo[];
|
||||
}): Observable<IndexBasicInfo[]> {
|
||||
return from(dataStreamsInfo).pipe(
|
||||
concatMap((indexInfo) =>
|
||||
of(indexInfo).pipe(
|
||||
map((dataStream) => getIndexNamespace(dataStream)),
|
||||
map((namespace) => {
|
||||
indexInfo.namespace = namespace;
|
||||
return indexInfo;
|
||||
})
|
||||
)
|
||||
),
|
||||
toArray()
|
||||
);
|
||||
}
|
||||
|
||||
export function groupStatsByPatternName(
|
||||
dataStreamsStats: DataStreamFieldStatsPerNamespace[]
|
||||
): DataStreamStats[] {
|
||||
const uniqueNamespaces = new Set<string>();
|
||||
const uniqueFields = new Set<string>();
|
||||
const statsByStream = dataStreamsStats.reduce<Map<string, DataStreamStats>>((acc, stats) => {
|
||||
if (!stats.patternName) {
|
||||
return acc;
|
||||
}
|
||||
|
||||
if (!acc.get(stats.patternName)) {
|
||||
acc.set(stats.patternName, {
|
||||
streamName: stats.patternName,
|
||||
shipper: stats.shipper,
|
||||
totalNamespaces: 0,
|
||||
totalDocuments: 0,
|
||||
failureStoreDocuments: 0,
|
||||
failureStoreIndices: 0,
|
||||
totalSize: 0,
|
||||
totalIndices: 0,
|
||||
totalFields: 0,
|
||||
structureLevel: {},
|
||||
fieldsCount: {},
|
||||
managedBy: [],
|
||||
packageName: [],
|
||||
beat: [],
|
||||
});
|
||||
}
|
||||
|
||||
const streamStats = acc.get(stats.patternName)!;
|
||||
|
||||
// Track unique namespaces
|
||||
if (stats.namespace) {
|
||||
uniqueNamespaces.add(stats.namespace);
|
||||
}
|
||||
streamStats.totalNamespaces = uniqueNamespaces.size;
|
||||
|
||||
// Track unique fields
|
||||
stats.uniqueFields.forEach((field) => uniqueFields.add(field));
|
||||
streamStats.totalFields = uniqueFields.size;
|
||||
|
||||
// Aggregate structure levels
|
||||
for (const [level, count] of Object.entries(stats.structureLevel)) {
|
||||
streamStats.structureLevel[Number(level)] =
|
||||
(streamStats.structureLevel[Number(level)] ?? 0) + count;
|
||||
}
|
||||
|
||||
streamStats.totalDocuments += stats.totalDocuments;
|
||||
streamStats.totalIndices += stats.totalIndices;
|
||||
streamStats.failureStoreDocuments += stats.failureStoreDocuments;
|
||||
streamStats.failureStoreIndices += stats.failureStoreIndices;
|
||||
streamStats.totalSize += stats.totalSize;
|
||||
|
||||
for (const [field, count] of Object.entries(stats.fieldsCount)) {
|
||||
streamStats.fieldsCount[field] = (streamStats.fieldsCount[field] ?? 0) + count;
|
||||
}
|
||||
|
||||
if (stats.meta?.managed_by) {
|
||||
streamStats.managedBy.push(stats.meta.managed_by);
|
||||
}
|
||||
|
||||
if (stats.meta?.package?.name) {
|
||||
streamStats.packageName.push(stats.meta.package.name);
|
||||
}
|
||||
|
||||
if (stats.meta?.beat) {
|
||||
streamStats.beat.push(stats.meta.beat);
|
||||
}
|
||||
|
||||
return acc;
|
||||
}, new Map());
|
||||
|
||||
return Array.from(statsByStream.values());
|
||||
}
|
||||
|
||||
export function getIndexBasicStats({
|
||||
esClient,
|
||||
indices,
|
||||
breatheDelay,
|
||||
}: {
|
||||
esClient: ElasticsearchClient;
|
||||
indices: IndexBasicInfo[];
|
||||
breatheDelay: number;
|
||||
}): Observable<DataStreamStatsPerNamespace[]> {
|
||||
const indexNames = indices.map((info) => info.name);
|
||||
|
||||
return from(
|
||||
esClient.indices.stats({
|
||||
index: indexNames,
|
||||
})
|
||||
).pipe(
|
||||
delay(breatheDelay),
|
||||
concatMap((allIndexStats) => {
|
||||
return from(getFailureStoreStats({ esClient, indexName: indexNames.join(',') })).pipe(
|
||||
map((allFailureStoreStats) => {
|
||||
return indices.map((info) =>
|
||||
getIndexStats(allIndexStats.indices, allFailureStoreStats, info)
|
||||
);
|
||||
})
|
||||
);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
export function getIndexFieldStats({
|
||||
basicStats,
|
||||
}: {
|
||||
basicStats: DataStreamStatsPerNamespace[];
|
||||
}): Observable<DataStreamFieldStatsPerNamespace[]> {
|
||||
return from(basicStats).pipe(
|
||||
map((stats) => getFieldStatsAndStructureLevels(stats, DATA_TELEMETRY_FIELDS)),
|
||||
toArray()
|
||||
);
|
||||
}
|
||||
|
||||
export function indexStatsToTelemetryEvents(stats: DataStreamStats[]): DataTelemetryEvent[] {
|
||||
return stats.map((stat) => ({
|
||||
pattern_name: stat.streamName,
|
||||
shipper: stat.shipper,
|
||||
doc_count: stat.totalDocuments,
|
||||
structure_level: stat.structureLevel,
|
||||
index_count: stat.totalIndices,
|
||||
failure_store_doc_count: stat.failureStoreDocuments,
|
||||
failure_store_index_count: stat.failureStoreIndices,
|
||||
namespace_count: stat.totalNamespaces,
|
||||
field_count: stat.totalFields,
|
||||
field_existence: stat.fieldsCount,
|
||||
size_in_bytes: stat.totalSize,
|
||||
managed_by: Array.from(new Set(stat.managedBy)),
|
||||
package_name: Array.from(new Set(stat.packageName)),
|
||||
beat: Array.from(new Set(stat.beat)),
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves information about data streams matching a given pattern.
|
||||
*/
|
||||
async function getDataStreamsInfoForPattern({
|
||||
esClient,
|
||||
pattern,
|
||||
}: {
|
||||
esClient: ElasticsearchClient;
|
||||
pattern: DatasetIndexPattern;
|
||||
}): Promise<IndexBasicInfo[]> {
|
||||
const resp = await esClient.indices.getDataStream({
|
||||
name: pattern.pattern,
|
||||
expand_wildcards: 'all',
|
||||
});
|
||||
|
||||
return resp.data_streams.map((dataStream) => ({
|
||||
patternName: pattern.patternName,
|
||||
shipper: pattern.shipper,
|
||||
isDataStream: true,
|
||||
name: dataStream.name,
|
||||
indices: dataStream.indices.map((index) => index.index_name),
|
||||
mapping: undefined,
|
||||
meta: dataStream._meta,
|
||||
}));
|
||||
}
|
||||
|
||||
async function getIndicesInfoForPattern({
|
||||
esClient,
|
||||
pattern,
|
||||
}: {
|
||||
esClient: ElasticsearchClient;
|
||||
pattern: DatasetIndexPattern;
|
||||
}): Promise<IndexBasicInfo[]> {
|
||||
const resp = await esClient.indices.get({
|
||||
index: pattern.pattern,
|
||||
});
|
||||
|
||||
return Object.entries(resp).map(([index, indexInfo]) => {
|
||||
// This is needed to keep the format same for data streams and indices
|
||||
const indexMapping: IndicesGetMappingResponse | undefined = indexInfo.mappings
|
||||
? {
|
||||
[index]: { mappings: indexInfo.mappings },
|
||||
}
|
||||
: undefined;
|
||||
|
||||
return {
|
||||
patternName: pattern.patternName,
|
||||
shipper: pattern.shipper,
|
||||
isDataStream: false,
|
||||
name: index,
|
||||
indices: [index],
|
||||
mapping: indexMapping,
|
||||
meta: indexInfo.mappings?._meta,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the namespace of index by checking the mappings of backing indices.
|
||||
*
|
||||
* @param {Object} indexInfo - The information about the index.
|
||||
* @returns {string | undefined} - The namespace of the data stream found in the mapping.
|
||||
*/
|
||||
function getIndexNamespace(indexInfo: IndexBasicInfo): string | undefined {
|
||||
for (let i = 0; i < indexInfo.indices.length; i++) {
|
||||
const index = indexInfo.indices[i];
|
||||
const indexMapping = indexInfo.mapping?.[index]?.mappings;
|
||||
const dataStreamMapping: MappingPropertyBase | undefined =
|
||||
indexMapping?.properties?.data_stream;
|
||||
if (!dataStreamMapping) {
|
||||
continue;
|
||||
}
|
||||
const namespace = (dataStreamMapping?.properties?.namespace as { value?: string })?.value;
|
||||
if (namespace) {
|
||||
return namespace;
|
||||
}
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
async function getFailureStoreStats({
|
||||
esClient,
|
||||
indexName,
|
||||
}: {
|
||||
esClient: ElasticsearchClient;
|
||||
indexName: string;
|
||||
}): Promise<IndicesStatsResponse['indices']> {
|
||||
try {
|
||||
// TODO: Use the failure store API when it is available
|
||||
const resp = await esClient.transport.request<ReturnType<typeof esClient.indices.stats>>({
|
||||
method: 'GET',
|
||||
path: `/${indexName}/_stats`,
|
||||
querystring: {
|
||||
failure_store: 'only',
|
||||
},
|
||||
});
|
||||
|
||||
return (await resp).indices;
|
||||
} catch (e) {
|
||||
// Failure store API may not be available
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
export function getIndexStats(
|
||||
allIndexStats: IndicesStatsResponse['indices'],
|
||||
allFailureStoreStats: IndicesStatsResponse['indices'],
|
||||
info: IndexBasicInfo
|
||||
): DataStreamStatsPerNamespace {
|
||||
let totalDocs = 0;
|
||||
let totalSize = 0;
|
||||
let totalIndices = 0;
|
||||
const indexStats: IndicesStatsResponse['indices'] = {};
|
||||
let failureStoreDocs = 0;
|
||||
let failureStoreIndices = 0;
|
||||
const failureStoreStats: IndicesStatsResponse['indices'] = {};
|
||||
Object.entries(allIndexStats ?? {}).forEach(([indexName, stats]) => {
|
||||
if (indexName.includes(info.name)) {
|
||||
totalDocs += stats.primaries?.docs?.count ?? 0;
|
||||
totalSize += stats.primaries?.store?.size_in_bytes ?? 0;
|
||||
totalIndices++;
|
||||
|
||||
indexStats[indexName] = stats;
|
||||
}
|
||||
});
|
||||
|
||||
Object.entries(allFailureStoreStats ?? {}).forEach(([indexName, stats]) => {
|
||||
if (indexName.includes(info.name)) {
|
||||
failureStoreDocs += stats.primaries?.docs?.count ?? 0;
|
||||
failureStoreIndices++;
|
||||
|
||||
failureStoreStats[indexName] = stats;
|
||||
}
|
||||
});
|
||||
|
||||
return {
|
||||
patternName: info.patternName,
|
||||
shipper: info.shipper,
|
||||
namespace: info.namespace,
|
||||
totalDocuments: totalDocs,
|
||||
totalSize,
|
||||
totalIndices,
|
||||
failureStoreDocuments: failureStoreDocs,
|
||||
failureStoreIndices,
|
||||
meta: info.meta,
|
||||
mapping: info.mapping,
|
||||
indexStats,
|
||||
failureStoreStats,
|
||||
};
|
||||
}
|
||||
|
||||
function getFieldStatsAndStructureLevels(
|
||||
stats: DataStreamStatsPerNamespace,
|
||||
fieldsToCheck: string[]
|
||||
): DataStreamFieldStatsPerNamespace {
|
||||
const uniqueFields = new Set<string>();
|
||||
const structureLevel: Record<number, number> = {};
|
||||
|
||||
// Loop through each index and get the number of fields and gather how many documents have that field
|
||||
const resourceFieldCounts: Record<string, number> = {};
|
||||
const indexNames = Object.keys(stats.indexStats ?? {});
|
||||
for (const backingIndex of indexNames) {
|
||||
const indexStats = stats.indexStats?.[backingIndex];
|
||||
const indexMapping = stats.mapping?.[backingIndex]?.mappings;
|
||||
if (!indexMapping) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Get all fields from the mapping
|
||||
const indexFieldsMap = getFieldPathsMapFromMapping(indexMapping);
|
||||
const indexFieldsList = Object.keys(indexFieldsMap);
|
||||
indexFieldsList.forEach((field) => uniqueFields.add(field));
|
||||
|
||||
const indexDocCount = indexStats?.primaries?.docs?.count ?? 0;
|
||||
if (!indexDocCount) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const indexStructureLevel = getStructureLevelForFieldsList(stats, indexFieldsMap);
|
||||
structureLevel[indexStructureLevel] =
|
||||
(structureLevel[indexStructureLevel] ?? 0) + indexDocCount;
|
||||
|
||||
for (const field of fieldsToCheck) {
|
||||
if (indexFieldsMap[field]) {
|
||||
resourceFieldCounts[field] = (resourceFieldCounts[field] ?? 0) + indexDocCount;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
...stats,
|
||||
uniqueFields: Array.from(uniqueFields),
|
||||
structureLevel,
|
||||
fieldsCount: resourceFieldCounts,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines the structure level of log documents based on the fields present in the list.
|
||||
*
|
||||
* Structure Levels:
|
||||
* - Level 0: Unstructured data. No `@timestamp` or `timestamp` field.
|
||||
* - Level 1: Contains `@timestamp` or `timestamp` field.
|
||||
* - Level 2: Contains any of resource fields (`host.name`, `service.name`, `host`, `hostname`, `host_name`).
|
||||
* - Level 3: Contains `@timestamp`, resource fields, and `message` field.
|
||||
* - Level 4: Index name complies with a pattern of known shipper e.g. `logstash-*`, `heartbeat-*`.
|
||||
* - Level 5a: Data stream naming scheme exists (`data_stream.dataset`, `data_stream.type`, `data_stream.namespace`).
|
||||
* - Level 5b: Contains at least 3 ECS fields or `ecs.version` field.
|
||||
* - Level 6: Part of an integration, managed by a known entity.
|
||||
*
|
||||
* @param stats - Container pattern, shipper and meta info
|
||||
* @param fieldsMap - Dictionary/Map of fields present in the index with full path as key.
|
||||
* @returns {number} - The structure level of the index.
|
||||
*/
|
||||
function getStructureLevelForFieldsList(
|
||||
stats: DataStreamStatsPerNamespace,
|
||||
fieldsMap: Record<string, boolean>
|
||||
): number {
|
||||
// Check level 1, if @timestamp or timestamp exists
|
||||
if (!fieldsMap['@timestamp'] && !fieldsMap.timestamp) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Check level 2, if resource fields exist
|
||||
if (!LEVEL_2_RESOURCE_FIELDS.some((field) => fieldsMap[field])) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
// Check level 3, if basic structure of log message exist
|
||||
if (
|
||||
!fieldsMap['@timestamp'] ||
|
||||
!fieldsMap.message ||
|
||||
(!fieldsMap['host.name'] && !fieldsMap['service.name'])
|
||||
) {
|
||||
return 2;
|
||||
}
|
||||
|
||||
// Check level 4 (Shipper is known)
|
||||
if (!stats.patternName || stats.patternName === 'generic-logs') {
|
||||
return 3;
|
||||
}
|
||||
|
||||
// Check level 5a (Data stream scheme exists)
|
||||
if (
|
||||
!fieldsMap['data_stream.dataset'] ||
|
||||
!fieldsMap['data_stream.type'] ||
|
||||
!fieldsMap['data_stream.namespace']
|
||||
) {
|
||||
// Check level 5b (ECS fields exist)
|
||||
const fieldsList = Object.keys(fieldsMap);
|
||||
if (
|
||||
!fieldsMap['ecs.version'] &&
|
||||
intersection(PROMINENT_LOG_ECS_FIELDS, fieldsList).length < 3
|
||||
) {
|
||||
return 4;
|
||||
}
|
||||
}
|
||||
|
||||
// Check level 6 (Index is managed)
|
||||
if (!stats.meta?.managed_by && !stats.meta?.managed) {
|
||||
return 5;
|
||||
}
|
||||
|
||||
// All levels are fulfilled
|
||||
return 6;
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively traverses a mapping and returns a dictionary of field paths.
|
||||
* Each key in the dictionary represents a full field path in dot notation.
|
||||
*
|
||||
* @param {MappingPropertyBase} mapping - The mapping to traverse.
|
||||
* @returns {Record<string, boolean>} - A dictionary of field paths.
|
||||
*/
|
||||
function getFieldPathsMapFromMapping(mapping: MappingPropertyBase): Record<string, boolean> {
|
||||
const fieldPathsMap: Record<string, boolean> = {};
|
||||
|
||||
function traverseMapping(nestedMapping: MappingPropertyBase, parentField: string = ''): void {
|
||||
for (const [fieldName, field] of Object.entries(nestedMapping.properties ?? {})) {
|
||||
const fullFieldName = parentField ? `${parentField}.${fieldName}` : fieldName;
|
||||
if ((field as MappingPropertyBase).properties) {
|
||||
traverseMapping(field as MappingPropertyBase, fullFieldName);
|
||||
} else {
|
||||
fieldPathsMap[fullFieldName] = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
traverseMapping(mapping);
|
||||
return fieldPathsMap;
|
||||
}
|
|
@ -0,0 +1,87 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import {
|
||||
IndexName,
|
||||
IndicesGetMappingResponse,
|
||||
IndicesStatsResponse,
|
||||
Metadata,
|
||||
} from '@elastic/elasticsearch/lib/api/types';
|
||||
|
||||
export interface DatasetIndexPattern {
|
||||
pattern: string;
|
||||
patternName: string;
|
||||
shipper?: string;
|
||||
}
|
||||
|
||||
export interface IndexBasicInfo {
|
||||
patternName: string;
|
||||
shipper?: string;
|
||||
isDataStream: boolean;
|
||||
name: string;
|
||||
indices: IndexName[];
|
||||
meta?: Metadata;
|
||||
mapping?: IndicesGetMappingResponse;
|
||||
namespace?: string;
|
||||
}
|
||||
|
||||
export interface DataStreamStatsPerNamespace {
|
||||
patternName: string;
|
||||
shipper?: string;
|
||||
namespace?: string;
|
||||
totalDocuments: number;
|
||||
totalSize: number;
|
||||
totalIndices: number;
|
||||
failureStoreDocuments: number;
|
||||
failureStoreIndices: number;
|
||||
meta?: Metadata;
|
||||
mapping?: IndicesGetMappingResponse;
|
||||
indexStats: IndicesStatsResponse['indices'];
|
||||
failureStoreStats: IndicesStatsResponse['indices'];
|
||||
}
|
||||
|
||||
export interface DataStreamStatsWithLevelsPerNamespace extends DataStreamStatsPerNamespace {
|
||||
structureLevel: Record<number, number>;
|
||||
}
|
||||
|
||||
export interface DataStreamFieldStatsPerNamespace extends DataStreamStatsWithLevelsPerNamespace {
|
||||
uniqueFields: string[];
|
||||
fieldsCount: Record<string, number>;
|
||||
}
|
||||
|
||||
export interface DataStreamStats {
|
||||
streamName: string;
|
||||
shipper?: string;
|
||||
totalNamespaces: number;
|
||||
totalDocuments: number;
|
||||
structureLevel: Record<number, number>;
|
||||
failureStoreDocuments: number;
|
||||
failureStoreIndices: number;
|
||||
totalSize: number;
|
||||
totalIndices: number;
|
||||
totalFields: number;
|
||||
fieldsCount: Record<string, number>;
|
||||
managedBy: string[];
|
||||
packageName: string[];
|
||||
beat: string[];
|
||||
}
|
||||
|
||||
export interface DataTelemetryEvent {
|
||||
pattern_name: string;
|
||||
shipper?: string;
|
||||
doc_count: number;
|
||||
structure_level: Record<string, number>;
|
||||
failure_store_doc_count: number;
|
||||
index_count: number;
|
||||
namespace_count: number;
|
||||
field_count: number;
|
||||
field_existence: Record<string, number>;
|
||||
size_in_bytes: number;
|
||||
managed_by: string[];
|
||||
package_name: string[];
|
||||
beat: string[];
|
||||
}
|
|
@ -8,3 +8,4 @@
|
|||
export { dataStreamService } from './data_stream';
|
||||
export { indexStatsService } from './index_stats';
|
||||
export { datasetQualityPrivileges } from './privileges';
|
||||
export { DataTelemetryService } from './data_telemetry/data_telemetry_service';
|
||||
|
|
|
@ -6,14 +6,26 @@
|
|||
*/
|
||||
|
||||
import { CustomRequestHandlerContext } from '@kbn/core/server';
|
||||
import { FleetSetupContract, FleetStartContract } from '@kbn/fleet-plugin/server';
|
||||
import type { AnalyticsServiceSetup, AnalyticsServiceStart } from '@kbn/core-analytics-server';
|
||||
import type { FleetSetupContract, FleetStartContract } from '@kbn/fleet-plugin/server';
|
||||
import {
|
||||
TaskManagerSetupContract,
|
||||
TaskManagerStartContract,
|
||||
} from '@kbn/task-manager-plugin/server';
|
||||
import type { TelemetryPluginSetup, TelemetryPluginStart } from '@kbn/telemetry-plugin/server';
|
||||
|
||||
export interface DatasetQualityPluginSetupDependencies {
|
||||
fleet: FleetSetupContract;
|
||||
analytics: AnalyticsServiceSetup;
|
||||
telemetry: TelemetryPluginSetup;
|
||||
taskManager: TaskManagerSetupContract;
|
||||
}
|
||||
|
||||
export interface DatasetQualityPluginStartDependencies {
|
||||
fleet: FleetStartContract;
|
||||
telemetry: TelemetryPluginStart;
|
||||
analytics: AnalyticsServiceStart;
|
||||
taskManager: TaskManagerStartContract;
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-empty-interface
|
||||
|
|
|
@ -53,10 +53,13 @@
|
|||
"@kbn/ebt-tools",
|
||||
"@kbn/fields-metadata-plugin",
|
||||
"@kbn/server-route-repository-utils",
|
||||
"@kbn/core-analytics-server",
|
||||
"@kbn/core-analytics-browser",
|
||||
"@kbn/core-lifecycle-browser",
|
||||
"@kbn/core-notifications-browser",
|
||||
"@kbn/rison"
|
||||
"@kbn/telemetry-plugin",
|
||||
"@kbn/rison",
|
||||
"@kbn/task-manager-plugin"
|
||||
],
|
||||
"exclude": [
|
||||
"target/**/*"
|
||||
|
|
|
@ -146,6 +146,7 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
'fleet:unenroll_action:retry',
|
||||
'fleet:update_agent_tags:retry',
|
||||
'fleet:upgrade_action:retry',
|
||||
'logs-data-telemetry',
|
||||
'observabilityAIAssistant:indexQueuedDocumentsTaskType',
|
||||
'osquery:telemetry-configs',
|
||||
'osquery:telemetry-packs',
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue