[8.x] [Dataset quality] Extracting totalDocs form degradedDocs request (#198757) (#199177)

# Backport

This will backport the following commits from `main` to `8.x`:
- [[Dataset quality] Extracting totalDocs form degradedDocs request
(#198757)](https://github.com/elastic/kibana/pull/198757)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Yngrid
Coello","email":"yngrid.coello@elastic.co"},"sourceCommit":{"committedDate":"2024-11-06T16:29:09Z","message":"[Dataset
quality] Extracting totalDocs form degradedDocs request
(#198757)\n\nRelates to
https://github.com/elastic/logs-dev/issues/183\r\n\r\n## Summary\r\nThis
PR aims to split out `total_docs` from `degraded_docs` request.\r\nThis
number is no longer relevant only for degraded docs. This PR is
a\r\npreparation step for supporting `failed_docs`.\r\n\r\n### 🎥 Demo
\r\n\r\nhttps://github.com/user-attachments/assets/7a826715-64e2-4799-8b54-934698df56e2\r\n\r\n####
When no documents are found in the selected
timerange\r\n\r\nhttps://github.com/user-attachments/assets/de974125-cf45-42d3-932f-32e43b282eb2\r\n\r\n####
Filtering
datasets\r\n\r\nhttps://github.com/user-attachments/assets/398fc7db-1e38-4998-9ecb-10e8644f812d\r\n\r\n###
TODO\r\n\r\n- [ ] Test in MKI before
merging","sha":"3e0ec510fab85d3b7346eb88b8a3bed0dc5b73f5","branchLabelMapping":{"^v9.0.0$":"main","^v8.17.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","v9.0.0","backport:prev-minor","ci:project-deploy-observability"],"title":"[Dataset
quality] Extracting totalDocs form degradedDocs
request","number":198757,"url":"https://github.com/elastic/kibana/pull/198757","mergeCommit":{"message":"[Dataset
quality] Extracting totalDocs form degradedDocs request
(#198757)\n\nRelates to
https://github.com/elastic/logs-dev/issues/183\r\n\r\n## Summary\r\nThis
PR aims to split out `total_docs` from `degraded_docs` request.\r\nThis
number is no longer relevant only for degraded docs. This PR is
a\r\npreparation step for supporting `failed_docs`.\r\n\r\n### 🎥 Demo
\r\n\r\nhttps://github.com/user-attachments/assets/7a826715-64e2-4799-8b54-934698df56e2\r\n\r\n####
When no documents are found in the selected
timerange\r\n\r\nhttps://github.com/user-attachments/assets/de974125-cf45-42d3-932f-32e43b282eb2\r\n\r\n####
Filtering
datasets\r\n\r\nhttps://github.com/user-attachments/assets/398fc7db-1e38-4998-9ecb-10e8644f812d\r\n\r\n###
TODO\r\n\r\n- [ ] Test in MKI before
merging","sha":"3e0ec510fab85d3b7346eb88b8a3bed0dc5b73f5"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/198757","number":198757,"mergeCommit":{"message":"[Dataset
quality] Extracting totalDocs form degradedDocs request
(#198757)\n\nRelates to
https://github.com/elastic/logs-dev/issues/183\r\n\r\n## Summary\r\nThis
PR aims to split out `total_docs` from `degraded_docs` request.\r\nThis
number is no longer relevant only for degraded docs. This PR is
a\r\npreparation step for supporting `failed_docs`.\r\n\r\n### 🎥 Demo
\r\n\r\nhttps://github.com/user-attachments/assets/7a826715-64e2-4799-8b54-934698df56e2\r\n\r\n####
When no documents are found in the selected
timerange\r\n\r\nhttps://github.com/user-attachments/assets/de974125-cf45-42d3-932f-32e43b282eb2\r\n\r\n####
Filtering
datasets\r\n\r\nhttps://github.com/user-attachments/assets/398fc7db-1e38-4998-9ecb-10e8644f812d\r\n\r\n###
TODO\r\n\r\n- [ ] Test in MKI before
merging","sha":"3e0ec510fab85d3b7346eb88b8a3bed0dc5b73f5"}}]}]
BACKPORT-->

---------

Co-authored-by: Yngrid Coello <yngrid.coello@elastic.co>
This commit is contained in:
Kibana Machine 2024-11-07 23:26:08 +11:00 committed by GitHub
parent 27c3d9a6dc
commit 31e0899604
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 816 additions and 494 deletions

View file

@ -37,6 +37,25 @@ export const dataStreamStatRt = rt.intersection([
export type DataStreamStat = rt.TypeOf<typeof dataStreamStatRt>;
export const dataStreamDocsStatRt = rt.type({
dataset: rt.string,
count: rt.number,
});
export type DataStreamDocsStat = rt.TypeOf<typeof dataStreamDocsStatRt>;
export const getDataStreamTotalDocsResponseRt = rt.type({
totalDocs: rt.array(dataStreamDocsStatRt),
});
export type DataStreamTotalDocsResponse = rt.TypeOf<typeof getDataStreamTotalDocsResponseRt>;
export const getDataStreamDegradedDocsResponseRt = rt.type({
degradedDocs: rt.array(dataStreamDocsStatRt),
});
export type DataStreamDegradedDocsResponse = rt.TypeOf<typeof getDataStreamDegradedDocsResponseRt>;
export const integrationDashboardRT = rt.type({
id: rt.string,
title: rt.string,
@ -84,15 +103,6 @@ export const getIntegrationsResponseRt = rt.exact(
export type IntegrationResponse = rt.TypeOf<typeof getIntegrationsResponseRt>;
export const degradedDocsRt = rt.type({
dataset: rt.string,
count: rt.number,
docsCount: rt.number,
percentage: rt.number,
});
export type DegradedDocs = rt.TypeOf<typeof degradedDocsRt>;
export const degradedFieldRt = rt.type({
name: rt.string,
count: rt.number,
@ -188,12 +198,6 @@ export const getDataStreamsStatsResponseRt = rt.exact(
})
);
export const getDataStreamsDegradedDocsStatsResponseRt = rt.exact(
rt.type({
degradedDocs: rt.array(degradedDocsRt),
})
);
export const getDataStreamsSettingsResponseRt = rt.exact(dataStreamSettingsRt);
export const getDataStreamsDetailsResponseRt = rt.exact(dataStreamDetailsRt);

View file

@ -11,6 +11,7 @@ export const DATASET_QUALITY_APP_ID = 'dataset_quality';
export const DEFAULT_DATASET_TYPE: DataStreamType = 'logs';
export const DEFAULT_LOGS_DATA_VIEW = 'logs-*-*';
export const DEFAULT_DATASET_QUALITY: QualityIndicators = 'good';
export const POOR_QUALITY_MINIMUM_PERCENTAGE = 3;
export const DEGRADED_QUALITY_MINIMUM_PERCENTAGE = 0;
@ -26,10 +27,8 @@ export const DEFAULT_TIME_RANGE = { from: 'now-24h', to: 'now' };
export const DEFAULT_DATEPICKER_REFRESH = { value: 60000, pause: false };
export const DEFAULT_DEGRADED_DOCS = {
percentage: 0,
count: 0,
docsCount: 0,
quality: 'good' as QualityIndicators,
percentage: 0,
};
export const NUMBER_FORMAT = '0,0.[000]';

View file

@ -5,11 +5,11 @@
* 2.0.
*/
import { DEFAULT_DEGRADED_DOCS } from '../constants';
import { DataStreamDocsStat } from '../api_types';
import { DEFAULT_DATASET_QUALITY, DEFAULT_DEGRADED_DOCS } from '../constants';
import { DataStreamType, QualityIndicators } from '../types';
import { indexNameToDataStreamParts, mapPercentageToQuality } from '../utils';
import { Integration } from './integration';
import { DegradedDocsStat } from './malformed_docs_stat';
import { DataStreamStatType } from './types';
export class DataStreamStat {
@ -24,11 +24,11 @@ export class DataStreamStat {
userPrivileges?: DataStreamStatType['userPrivileges'];
totalDocs?: DataStreamStatType['totalDocs']; // total datastream docs count
integration?: Integration;
quality: QualityIndicators;
docsInTimeRange?: number;
degradedDocs: {
percentage: number;
count: number;
docsCount: number; // docs count in the filtered time range
quality: QualityIndicators;
};
private constructor(dataStreamStat: DataStreamStat) {
@ -43,12 +43,9 @@ export class DataStreamStat {
this.userPrivileges = dataStreamStat.userPrivileges;
this.totalDocs = dataStreamStat.totalDocs;
this.integration = dataStreamStat.integration;
this.degradedDocs = {
percentage: dataStreamStat.degradedDocs.percentage,
count: dataStreamStat.degradedDocs.count,
docsCount: dataStreamStat.degradedDocs.docsCount,
quality: dataStreamStat.degradedDocs.quality,
};
this.quality = dataStreamStat.quality;
this.docsInTimeRange = dataStreamStat.docsInTimeRange;
this.degradedDocs = dataStreamStat.degradedDocs;
}
public static create(dataStreamStat: DataStreamStatType) {
@ -65,6 +62,7 @@ export class DataStreamStat {
lastActivity: dataStreamStat.lastActivity,
userPrivileges: dataStreamStat.userPrivileges,
totalDocs: dataStreamStat.totalDocs,
quality: DEFAULT_DATASET_QUALITY,
degradedDocs: DEFAULT_DEGRADED_DOCS,
};
@ -74,9 +72,11 @@ export class DataStreamStat {
public static fromDegradedDocStat({
degradedDocStat,
datasetIntegrationMap,
totalDocs,
}: {
degradedDocStat: DegradedDocsStat;
degradedDocStat: DataStreamDocsStat & { percentage: number };
datasetIntegrationMap: Record<string, { integration: Integration; title: string }>;
totalDocs: number;
}) {
const { type, dataset, namespace } = indexNameToDataStreamParts(degradedDocStat.dataset);
@ -87,19 +87,23 @@ export class DataStreamStat {
title: datasetIntegrationMap[dataset]?.title || dataset,
namespace,
integration: datasetIntegrationMap[dataset]?.integration,
quality: mapPercentageToQuality(degradedDocStat.percentage),
docsInTimeRange: totalDocs,
degradedDocs: {
percentage: degradedDocStat.percentage,
count: degradedDocStat.count,
docsCount: degradedDocStat.docsCount,
quality: mapPercentageToQuality(degradedDocStat.percentage),
},
};
return new DataStreamStat(dataStreamStatProps);
}
public static calculateFilteredSize({ sizeBytes, totalDocs, degradedDocs }: DataStreamStat) {
public static calculateFilteredSize({ sizeBytes, totalDocs, docsInTimeRange }: DataStreamStat) {
const avgDocSize = sizeBytes && totalDocs ? sizeBytes / totalDocs : 0;
return avgDocSize * degradedDocs.docsCount;
return avgDocSize * (docsInTimeRange ?? 0);
}
public static calculatePercentage({ totalDocs, count }: { totalDocs?: number; count?: number }) {
return totalDocs && count ? (count / totalDocs) * 100 : 0;
}
}

View file

@ -1,31 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { QualityIndicators } from '../types';
import { mapPercentageToQuality } from '../utils';
import { DegradedDocsStatType } from './types';
export class DegradedDocsStat {
dataset: DegradedDocsStatType['dataset'];
percentage: DegradedDocsStatType['percentage'];
count: DegradedDocsStatType['count'];
docsCount: DegradedDocsStatType['docsCount'];
quality: QualityIndicators;
private constructor(degradedDocsStat: DegradedDocsStat) {
this.dataset = degradedDocsStat.dataset;
this.percentage = degradedDocsStat.percentage;
this.count = degradedDocsStat.count;
this.docsCount = degradedDocsStat.docsCount;
this.quality = degradedDocsStat.quality;
}
public static create(degradedDocsStat: DegradedDocsStatType) {
const quality = mapPercentageToQuality(degradedDocsStat.percentage);
return new DegradedDocsStat({ ...degradedDocsStat, quality });
}
}

View file

@ -18,10 +18,14 @@ export type DataStreamStatServiceResponse = GetDataStreamsStatsResponse;
export type GetDataStreamsDegradedDocsStatsParams =
APIClientRequestParamsOf<`GET /internal/dataset_quality/data_streams/degraded_docs`>['params'];
export type GetDataStreamsDegradedDocsStatsQuery = GetDataStreamsDegradedDocsStatsParams['query'];
export type GetDataStreamsDegradedDocsStatsResponse =
APIReturnType<`GET /internal/dataset_quality/data_streams/degraded_docs`>;
export type DegradedDocsStatType = GetDataStreamsDegradedDocsStatsResponse['degradedDocs'][0];
export type DataStreamDegradedDocsStatServiceResponse = DegradedDocsStatType[];
/*
Types for stats based in documents inside a DataStream
*/
export type GetDataStreamsTotalDocsParams =
APIClientRequestParamsOf<`GET /internal/dataset_quality/data_streams/total_docs`>['params'];
export type GetDataStreamsTotalDocsQuery = GetDataStreamsTotalDocsParams['query'];
/*
Types for Degraded Fields inside a DataStream

View file

@ -19,9 +19,7 @@ export const DatasetQualityIndicator = ({
isLoading: boolean;
dataStreamStat: DataStreamStat;
}) => {
const {
degradedDocs: { quality },
} = dataStreamStat;
const { quality } = dataStreamStat;
const translatedQuality = i18n.translate('xpack.datasetQuality.datasetQualityIdicator', {
defaultMessage: '{quality}',

View file

@ -49,7 +49,7 @@ export const useDatasetQualityFilters = () => {
datasets.reduce(
(acc: Filters, dataset) => ({
namespaces: [...new Set([...acc.namespaces, dataset.namespace])],
qualities: [...new Set([...acc.qualities, dataset.degradedDocs.quality])],
qualities: [...new Set([...acc.qualities, dataset.quality])],
filteredIntegrations: [
...new Set([...acc.filteredIntegrations, dataset.integration?.name ?? 'none']),
],

View file

@ -132,8 +132,7 @@ export const useDatasetQualityTable = () => {
const passesNamespaceFilter =
namespaces.length === 0 || namespaces.includes(dataset.namespace);
const passesQualityFilter =
qualities.length === 0 || qualities.includes(dataset.degradedDocs.quality);
const passesQualityFilter = qualities.length === 0 || qualities.includes(dataset.quality);
const passesQueryFilter = !query || dataset.rawName.includes(query);

View file

@ -77,7 +77,7 @@ function getDatasetEbtProps(
namespace: dataset.namespace,
type: dataset.type,
},
data_stream_health: dataset.degradedDocs.quality,
data_stream_health: dataset.quality,
data_stream_aggregatable: nonAggregatableDatasets.some(
(indexName) => indexName === dataset.rawName
),

View file

@ -84,7 +84,7 @@ const useSummaryPanel = () => {
datasetsActivity,
numberOfDatasets: filteredItems.length,
numberOfDocuments: filteredItems.reduce((acc, curr) => acc + curr.degradedDocs.docsCount, 0),
numberOfDocuments: filteredItems.reduce((acc, curr) => acc + curr.docsInTimeRange!, 0),
};
};

View file

@ -10,8 +10,11 @@ import { decodeOrThrow } from '@kbn/io-ts-utils';
import rison from '@kbn/rison';
import { KNOWN_TYPES } from '../../../common/constants';
import {
getDataStreamsDegradedDocsStatsResponseRt,
DataStreamDegradedDocsResponse,
DataStreamTotalDocsResponse,
getDataStreamDegradedDocsResponseRt,
getDataStreamsStatsResponseRt,
getDataStreamTotalDocsResponseRt,
getIntegrationsResponseRt,
getNonAggregatableDatasetsRt,
IntegrationResponse,
@ -20,9 +23,9 @@ import {
import {
DataStreamStatServiceResponse,
GetDataStreamsDegradedDocsStatsQuery,
GetDataStreamsDegradedDocsStatsResponse,
GetDataStreamsStatsQuery,
GetDataStreamsStatsResponse,
GetDataStreamsTotalDocsQuery,
GetNonAggregatableDataStreamsParams,
} from '../../../common/data_streams_stats';
import { Integration } from '../../../common/data_streams_stats/integration';
@ -56,16 +59,37 @@ export class DataStreamsStatsClient implements IDataStreamsStatsClient {
return { dataStreamsStats, datasetUserPrivileges };
}
public async getDataStreamsDegradedStats(params: GetDataStreamsDegradedDocsStatsQuery) {
public async getDataStreamsTotalDocs(params: GetDataStreamsTotalDocsQuery) {
const response = await this.http
.get<GetDataStreamsDegradedDocsStatsResponse>(
'/internal/dataset_quality/data_streams/degraded_docs',
{
query: {
...params,
},
}
)
.get<DataStreamTotalDocsResponse>('/internal/dataset_quality/data_streams/total_docs', {
query: {
...params,
},
})
.catch((error) => {
throw new DatasetQualityError(`Failed to fetch data streams total docs: ${error}`, error);
});
const { totalDocs } = decodeOrThrow(
getDataStreamTotalDocsResponseRt,
(message: string) =>
new DatasetQualityError(
`Failed to decode data streams total docs stats response: ${message}`
)
)(response);
return totalDocs;
}
public async getDataStreamsDegradedStats(params: GetDataStreamsDegradedDocsStatsQuery) {
const types = params.types.length === 0 ? KNOWN_TYPES : params.types;
const response = await this.http
.get<DataStreamDegradedDocsResponse>('/internal/dataset_quality/data_streams/degraded_docs', {
query: {
...params,
types: rison.encodeArray(types),
},
})
.catch((error) => {
throw new DatasetQualityError(
`Failed to fetch data streams degraded stats: ${error}`,
@ -74,7 +98,7 @@ export class DataStreamsStatsClient implements IDataStreamsStatsClient {
});
const { degradedDocs } = decodeOrThrow(
getDataStreamsDegradedDocsStatsResponseRt,
getDataStreamDegradedDocsResponseRt,
(message: string) =>
new DatasetQualityError(
`Failed to decode data streams degraded docs stats response: ${message}`

View file

@ -7,14 +7,14 @@
import { HttpStart } from '@kbn/core/public';
import {
DataStreamDegradedDocsStatServiceResponse,
DataStreamStatServiceResponse,
GetDataStreamsDegradedDocsStatsQuery,
GetDataStreamsStatsQuery,
GetDataStreamsTotalDocsQuery,
GetNonAggregatableDataStreamsParams,
} from '../../../common/data_streams_stats';
import { Integration } from '../../../common/data_streams_stats/integration';
import { NonAggregatableDatasets } from '../../../common/api_types';
import { DataStreamDocsStat, NonAggregatableDatasets } from '../../../common/api_types';
export type DataStreamsStatsServiceSetup = void;
@ -30,7 +30,8 @@ export interface IDataStreamsStatsClient {
getDataStreamsStats(params?: GetDataStreamsStatsQuery): Promise<DataStreamStatServiceResponse>;
getDataStreamsDegradedStats(
params?: GetDataStreamsDegradedDocsStatsQuery
): Promise<DataStreamDegradedDocsStatServiceResponse>;
): Promise<DataStreamDocsStat[]>;
getDataStreamsTotalDocs(params: GetDataStreamsTotalDocsQuery): Promise<DataStreamDocsStat[]>;
getIntegrations(): Promise<Integration[]>;
getNonAggregatableDatasets(
params: GetNonAggregatableDataStreamsParams

View file

@ -37,7 +37,8 @@ export const DEFAULT_CONTEXT: DefaultDatasetQualityControllerState = {
canViewIntegrations: true,
},
dataStreamStats: [],
degradedDocStats: DEFAULT_DICTIONARY_TYPE,
degradedDocStats: [],
totalDocsStats: DEFAULT_DICTIONARY_TYPE,
filters: {
inactive: true,
fullNames: false,

View file

@ -7,6 +7,7 @@
import { IToasts } from '@kbn/core/public';
import { i18n } from '@kbn/i18n';
import { DataStreamType } from '../../../../common/types';
export const fetchDatasetStatsFailedNotifier = (toasts: IToasts, error: Error) => {
toasts.addDanger({
@ -26,6 +27,18 @@ export const fetchDegradedStatsFailedNotifier = (toasts: IToasts, error: Error)
});
};
export const fetchTotalDocsFailedNotifier = (toasts: IToasts, error: Error, meta: any) => {
const dataStreamType = meta._event.origin as DataStreamType;
toasts.addDanger({
title: i18n.translate('xpack.datasetQuality.fetchTotalDocsFailed', {
defaultMessage: "We couldn't get total docs information for {dataStreamType}.",
values: { dataStreamType },
}),
text: error.message,
});
};
export const fetchIntegrationsFailedNotifier = (toasts: IToasts, error: Error) => {
toasts.addDanger({
title: i18n.translate('xpack.datasetQuality.fetchIntegrationsFailed', {

View file

@ -8,12 +8,13 @@
import { IToasts } from '@kbn/core/public';
import { getDateISORange } from '@kbn/timerange';
import { assign, createMachine, DoneInvokeEvent, InterpreterFrom } from 'xstate';
import { DataStreamStat, NonAggregatableDatasets } from '../../../../common/api_types';
import { KNOWN_TYPES } from '../../../../common/constants';
import {
DataStreamDegradedDocsStatServiceResponse,
DataStreamStatServiceResponse,
} from '../../../../common/data_streams_stats';
DataStreamDocsStat,
DataStreamStat,
NonAggregatableDatasets,
} from '../../../../common/api_types';
import { KNOWN_TYPES } from '../../../../common/constants';
import { DataStreamStatServiceResponse } from '../../../../common/data_streams_stats';
import { Integration } from '../../../../common/data_streams_stats/integration';
import { DataStreamType } from '../../../../common/types';
import { IDataStreamsStatsClient } from '../../../services/data_streams_stats';
@ -24,6 +25,7 @@ import {
fetchDatasetStatsFailedNotifier,
fetchDegradedStatsFailedNotifier,
fetchIntegrationsFailedNotifier,
fetchTotalDocsFailedNotifier,
} from './notifications';
import {
DatasetQualityControllerContext,
@ -92,28 +94,28 @@ export const createPureDatasetQualityControllerStateMachine = (
initial: 'fetching',
states: {
fetching: {
...generateInvokePerType({
invoke: {
src: 'loadDegradedDocs',
}),
onDone: {
target: 'loaded',
actions: ['storeDegradedDocStats', 'storeDatasets'],
},
onError: [
{
target: 'unauthorized',
cond: 'checkIfActionForbidden',
},
{
target: 'loaded',
actions: ['notifyFetchDegradedStatsFailed'],
},
],
},
},
loaded: {},
unauthorized: { type: 'final' },
},
on: {
SAVE_DEGRADED_DOCS_STATS: {
target: 'degradedDocs.loaded',
actions: ['storeDegradedDocStats', 'storeDatasets'],
},
NOTIFY_DEGRADED_DOCS_STATS_FAILED: [
{
target: 'degradedDocs.unauthorized',
cond: 'checkIfActionForbidden',
},
{
target: 'degradedDocs.loaded',
actions: ['notifyFetchDegradedStatsFailed'],
},
],
UPDATE_TIME_RANGE: {
target: 'degradedDocs.fetching',
actions: ['storeTimeRange'],
@ -123,6 +125,41 @@ export const createPureDatasetQualityControllerStateMachine = (
},
},
},
docsStats: {
initial: 'fetching',
states: {
fetching: {
...generateInvokePerType({
src: 'loadDataStreamDocsStats',
}),
},
loaded: {},
unauthorized: { type: 'final' },
},
on: {
SAVE_TOTAL_DOCS_STATS: {
target: 'docsStats.loaded',
actions: ['storeTotalDocStats', 'storeDatasets'],
},
NOTIFY_TOTAL_DOCS_STATS_FAILED: [
{
target: 'docsStats.unauthorized',
cond: 'checkIfActionForbidden',
},
{
target: 'docsStats.loaded',
actions: ['notifyFetchTotalDocsFailed'],
},
],
UPDATE_TIME_RANGE: {
target: 'docsStats.fetching',
actions: ['storeTimeRange'],
},
REFRESH_DATA: {
target: 'docsStats.fetching',
},
},
},
nonAggregatableDatasets: {
initial: 'fetching',
states: {
@ -329,18 +366,21 @@ export const createPureDatasetQualityControllerStateMachine = (
};
}
),
storeDegradedDocStats: assign(
(context, event: DoneInvokeEvent<DataStreamDegradedDocsStatServiceResponse>, meta) => {
storeTotalDocStats: assign(
(context, event: DoneInvokeEvent<DataStreamDocsStat[]>, meta) => {
const type = meta._event.origin as DataStreamType;
return {
degradedDocStats: {
...context.degradedDocStats,
totalDocsStats: {
...context.totalDocsStats,
[type]: event.data,
},
};
}
),
storeDegradedDocStats: assign((_context, event: DoneInvokeEvent<DataStreamDocsStat[]>) => ({
degradedDocStats: event.data,
})),
storeNonAggregatableDatasets: assign(
(_context, event: DoneInvokeEvent<NonAggregatableDatasets>) => ({
nonAggregatableDatasets: event.data.datasets,
@ -364,7 +404,8 @@ export const createPureDatasetQualityControllerStateMachine = (
datasets: generateDatasets(
context.dataStreamStats,
context.degradedDocStats,
context.integrations
context.integrations,
context.totalDocsStats
),
}
: {};
@ -404,6 +445,8 @@ export const createDatasetQualityControllerStateMachine = ({
fetchNonAggregatableDatasetsFailedNotifier(toasts, event.data),
notifyFetchIntegrationsFailed: (_context, event: DoneInvokeEvent<Error>) =>
fetchIntegrationsFailedNotifier(toasts, event.data),
notifyFetchTotalDocsFailed: (_context, event: DoneInvokeEvent<Error>, meta) =>
fetchTotalDocsFailedNotifier(toasts, event.data, meta),
},
services: {
loadDataStreamStats: (context, _event) =>
@ -411,32 +454,41 @@ export const createDatasetQualityControllerStateMachine = ({
types: context.filters.types as DataStreamType[],
datasetQuery: context.filters.query,
}),
loadDegradedDocs:
loadDataStreamDocsStats:
(context, _event, { data: { type } }) =>
async (send) => {
try {
const { startDate: start, endDate: end } = getDateISORange(context.filters.timeRange);
const degradedDocsStats = await (isTypeSelected(type, context)
? dataStreamStatsClient.getDataStreamsDegradedStats({
const totalDocsStats = await (isTypeSelected(type, context)
? dataStreamStatsClient.getDataStreamsTotalDocs({
type,
datasetQuery: context.filters.query,
start,
end,
})
: Promise.resolve([]));
send({
type: 'SAVE_DEGRADED_DOCS_STATS',
data: degradedDocsStats,
type: 'SAVE_TOTAL_DOCS_STATS',
data: totalDocsStats,
});
} catch (e) {
send({
type: 'NOTIFY_DEGRADED_DOCS_STATS_FAILED',
type: 'NOTIFY_TOTAL_DOCS_STATS_FAILED',
data: e,
});
}
},
loadDegradedDocs: (context) => {
const { startDate: start, endDate: end } = getDateISORange(context.filters.timeRange);
return dataStreamStatsClient.getDataStreamsDegradedStats({
types: context.filters.types as DataStreamType[],
datasetQuery: context.filters.query,
start,
end,
});
},
loadNonAggregatableDatasets: (context) => {
const { startDate: start, endDate: end } = getDateISORange(context.filters.timeRange);

View file

@ -6,16 +6,18 @@
*/
import { DoneInvokeEvent } from 'xstate';
import { DatasetUserPrivileges, NonAggregatableDatasets } from '../../../../common/api_types';
import {
DataStreamDegradedDocsStatServiceResponse,
DataStreamDocsStat,
DatasetUserPrivileges,
NonAggregatableDatasets,
} from '../../../../common/api_types';
import {
DataStreamDetails,
DataStreamStat,
DataStreamStatServiceResponse,
DataStreamStatType,
} from '../../../../common/data_streams_stats';
import { Integration } from '../../../../common/data_streams_stats/integration';
import { DegradedDocsStat } from '../../../../common/data_streams_stats/malformed_docs_stat';
import {
DataStreamType,
QualityIndicators,
@ -50,8 +52,12 @@ export interface WithDataStreamStats {
dataStreamStats: DataStreamStatType[];
}
export interface WithTotalDocs {
totalDocsStats: DictionaryType<DataStreamDocsStat>;
}
export interface WithDegradedDocs {
degradedDocStats: DictionaryType<DegradedDocsStat>;
degradedDocStats: DataStreamDocsStat[];
}
export interface WithNonAggregatableDatasets {
@ -68,6 +74,7 @@ export interface WithIntegrations {
export type DefaultDatasetQualityControllerState = WithTableOptions &
WithDataStreamStats &
WithTotalDocs &
WithDegradedDocs &
WithDatasets &
WithFilters &
@ -146,7 +153,7 @@ export type DatasetQualityControllerEvent =
type: 'UPDATE_TYPES';
types: DataStreamType[];
}
| DoneInvokeEvent<DataStreamDegradedDocsStatServiceResponse>
| DoneInvokeEvent<DataStreamDocsStat[]>
| DoneInvokeEvent<NonAggregatableDatasets>
| DoneInvokeEvent<DataStreamDetails>
| DoneInvokeEvent<DataStreamStatServiceResponse>

View file

@ -5,11 +5,10 @@
* 2.0.
*/
import { indexNameToDataStreamParts } from '../../common/utils';
import { Integration } from '../../common/data_streams_stats/integration';
import { generateDatasets } from './generate_datasets';
import { DataStreamStatType } from '../../common/data_streams_stats';
import { Integration } from '../../common/data_streams_stats/integration';
import { DEFAULT_DICTIONARY_TYPE } from '../state_machines/dataset_quality_controller';
import { generateDatasets } from './generate_datasets';
describe('generateDatasets', () => {
const integrations: Integration[] = [
@ -41,6 +40,7 @@ describe('generateDatasets', () => {
lastActivity: 1712911241117,
size: '82.1kb',
sizeBytes: 84160,
totalDocs: 100,
integration: 'system',
userPrivileges: {
canMonitor: true,
@ -51,182 +51,337 @@ describe('generateDatasets', () => {
lastActivity: 1712911241117,
size: '62.5kb',
sizeBytes: 64066,
totalDocs: 100,
userPrivileges: {
canMonitor: true,
},
},
];
const degradedDocs = {
const totalDocs = {
...DEFAULT_DICTIONARY_TYPE,
logs: [
{
dataset: 'logs-system.application-default',
percentage: 0,
count: 0,
docsCount: 0,
quality: 'good' as const,
count: 100,
},
{
dataset: 'logs-synth-default',
percentage: 11.320754716981131,
count: 6,
docsCount: 0,
quality: 'poor' as const,
count: 100,
},
],
};
it('merges integrations information with dataStreamStats', () => {
const datasets = generateDatasets(dataStreamStats, DEFAULT_DICTIONARY_TYPE, integrations);
const degradedDocs = [
{
dataset: 'logs-system.application-default',
count: 0,
},
{
dataset: 'logs-synth-default',
count: 6,
},
];
it('merges integrations information with dataStreamStats and degradedDocs', () => {
const datasets = generateDatasets(dataStreamStats, degradedDocs, integrations, totalDocs);
expect(datasets).toEqual([
{
...dataStreamStats[0],
name: indexNameToDataStreamParts(dataStreamStats[0].name).dataset,
namespace: indexNameToDataStreamParts(dataStreamStats[0].name).namespace,
title:
integrations[0].datasets[indexNameToDataStreamParts(dataStreamStats[0].name).dataset],
type: indexNameToDataStreamParts(dataStreamStats[0].name).type,
rawName: dataStreamStats[0].name,
name: 'system.application',
type: 'logs',
namespace: 'default',
title: 'Windows Application Events',
rawName: 'logs-system.application-default',
lastActivity: 1712911241117,
size: '82.1kb',
sizeBytes: 84160,
integration: integrations[0],
totalDocs: 100,
userPrivileges: {
canMonitor: true,
},
docsInTimeRange: 100,
quality: 'good',
degradedDocs: {
percentage: degradedDocs.logs[0].percentage,
count: degradedDocs.logs[0].count,
docsCount: degradedDocs.logs[0].docsCount,
quality: degradedDocs.logs[0].quality,
percentage: 0,
count: 0,
},
},
{
...dataStreamStats[1],
name: indexNameToDataStreamParts(dataStreamStats[1].name).dataset,
namespace: indexNameToDataStreamParts(dataStreamStats[1].name).namespace,
title: indexNameToDataStreamParts(dataStreamStats[1].name).dataset,
type: indexNameToDataStreamParts(dataStreamStats[1].name).type,
rawName: dataStreamStats[1].name,
name: 'synth',
type: 'logs',
namespace: 'default',
title: 'synth',
rawName: 'logs-synth-default',
lastActivity: 1712911241117,
size: '62.5kb',
sizeBytes: 64066,
integration: undefined,
totalDocs: 100,
userPrivileges: {
canMonitor: true,
},
docsInTimeRange: 100,
quality: 'poor',
degradedDocs: {
count: 6,
percentage: 6,
},
},
]);
});
it('merges integrations information with dataStreamStats and degradedDocs when no docs in timerange', () => {
const datasets = generateDatasets(
dataStreamStats,
degradedDocs,
integrations,
DEFAULT_DICTIONARY_TYPE
);
expect(datasets).toEqual([
{
name: 'system.application',
type: 'logs',
namespace: 'default',
title: 'Windows Application Events',
rawName: 'logs-system.application-default',
lastActivity: 1712911241117,
size: '82.1kb',
sizeBytes: 84160,
integration: integrations[0],
totalDocs: 100,
userPrivileges: {
canMonitor: true,
},
docsInTimeRange: 0,
quality: 'good',
degradedDocs: {
count: 0,
percentage: 0,
docsCount: 0,
quality: 'good',
count: 0,
},
},
{
name: 'synth',
type: 'logs',
namespace: 'default',
title: 'synth',
rawName: 'logs-synth-default',
lastActivity: 1712911241117,
size: '62.5kb',
sizeBytes: 64066,
integration: undefined,
totalDocs: 100,
userPrivileges: {
canMonitor: true,
},
docsInTimeRange: 0,
quality: 'good',
degradedDocs: {
count: 6,
percentage: 0,
},
},
]);
});
it('merges integrations information with degradedDocs', () => {
const datasets = generateDatasets(undefined, degradedDocs, integrations);
const datasets = generateDatasets([], degradedDocs, integrations, totalDocs);
expect(datasets).toEqual([
{
rawName: degradedDocs.logs[0].dataset,
name: indexNameToDataStreamParts(degradedDocs.logs[0].dataset).dataset,
type: indexNameToDataStreamParts(degradedDocs.logs[0].dataset).type,
name: 'system.application',
type: 'logs',
namespace: 'default',
title: 'Windows Application Events',
rawName: 'logs-system.application-default',
lastActivity: undefined,
size: undefined,
sizeBytes: undefined,
userPrivileges: undefined,
namespace: indexNameToDataStreamParts(degradedDocs.logs[0].dataset).namespace,
title:
integrations[0].datasets[
indexNameToDataStreamParts(degradedDocs.logs[0].dataset).dataset
],
integration: integrations[0],
totalDocs: undefined,
userPrivileges: undefined,
docsInTimeRange: 100,
quality: 'good',
degradedDocs: {
percentage: degradedDocs.logs[0].percentage,
count: degradedDocs.logs[0].count,
docsCount: degradedDocs.logs[0].docsCount,
quality: degradedDocs.logs[0].quality,
percentage: 0,
count: 0,
},
},
{
rawName: degradedDocs.logs[1].dataset,
name: indexNameToDataStreamParts(degradedDocs.logs[1].dataset).dataset,
type: indexNameToDataStreamParts(degradedDocs.logs[1].dataset).type,
name: 'synth',
type: 'logs',
namespace: 'default',
title: 'synth',
rawName: 'logs-synth-default',
lastActivity: undefined,
size: undefined,
sizeBytes: undefined,
userPrivileges: undefined,
namespace: indexNameToDataStreamParts(degradedDocs.logs[1].dataset).namespace,
title: indexNameToDataStreamParts(degradedDocs.logs[1].dataset).dataset,
integration: undefined,
totalDocs: undefined,
userPrivileges: undefined,
docsInTimeRange: 100,
quality: 'poor',
degradedDocs: {
percentage: degradedDocs.logs[1].percentage,
count: degradedDocs.logs[1].count,
docsCount: degradedDocs.logs[1].docsCount,
quality: degradedDocs.logs[1].quality,
count: 6,
percentage: 6,
},
},
]);
});
it('merges integrations information with dataStreamStats and degradedDocs', () => {
const datasets = generateDatasets(dataStreamStats, degradedDocs, integrations);
it('merges integrations information with degradedDocs and totalDocs', () => {
const datasets = generateDatasets([], degradedDocs, integrations, {
...totalDocs,
logs: [...totalDocs.logs, { dataset: 'logs-another-default', count: 100 }],
});
expect(datasets).toEqual([
{
...dataStreamStats[0],
name: indexNameToDataStreamParts(dataStreamStats[0].name).dataset,
namespace: indexNameToDataStreamParts(dataStreamStats[0].name).namespace,
title:
integrations[0].datasets[indexNameToDataStreamParts(dataStreamStats[0].name).dataset],
type: indexNameToDataStreamParts(dataStreamStats[0].name).type,
rawName: dataStreamStats[0].name,
name: 'system.application',
type: 'logs',
namespace: 'default',
title: 'Windows Application Events',
rawName: 'logs-system.application-default',
lastActivity: undefined,
size: undefined,
sizeBytes: undefined,
integration: integrations[0],
totalDocs: undefined,
userPrivileges: undefined,
docsInTimeRange: 100,
quality: 'good',
degradedDocs: {
percentage: degradedDocs.logs[0].percentage,
count: degradedDocs.logs[0].count,
docsCount: degradedDocs.logs[0].docsCount,
quality: degradedDocs.logs[0].quality,
percentage: 0,
count: 0,
},
},
{
...dataStreamStats[1],
name: indexNameToDataStreamParts(dataStreamStats[1].name).dataset,
namespace: indexNameToDataStreamParts(dataStreamStats[1].name).namespace,
title: indexNameToDataStreamParts(dataStreamStats[1].name).dataset,
type: indexNameToDataStreamParts(dataStreamStats[1].name).type,
rawName: dataStreamStats[1].name,
name: 'synth',
type: 'logs',
namespace: 'default',
title: 'synth',
rawName: 'logs-synth-default',
lastActivity: undefined,
size: undefined,
sizeBytes: undefined,
integration: undefined,
totalDocs: undefined,
userPrivileges: undefined,
docsInTimeRange: 100,
quality: 'poor',
degradedDocs: {
percentage: degradedDocs.logs[1].percentage,
count: degradedDocs.logs[1].count,
docsCount: degradedDocs.logs[1].docsCount,
quality: degradedDocs.logs[1].quality,
count: 6,
percentage: 6,
},
},
{
name: 'another',
type: 'logs',
namespace: 'default',
title: 'another',
rawName: 'logs-another-default',
lastActivity: undefined,
size: undefined,
sizeBytes: undefined,
integration: undefined,
totalDocs: undefined,
userPrivileges: undefined,
docsInTimeRange: 100,
quality: 'good',
degradedDocs: {
percentage: 0,
count: 0,
},
},
]);
});
it('merges integrations information with dataStreamStats', () => {
const datasets = generateDatasets(dataStreamStats, [], integrations, totalDocs);
expect(datasets).toEqual([
{
name: 'system.application',
type: 'logs',
namespace: 'default',
title: 'Windows Application Events',
rawName: 'logs-system.application-default',
lastActivity: 1712911241117,
size: '82.1kb',
sizeBytes: 84160,
integration: integrations[0],
totalDocs: 100,
userPrivileges: {
canMonitor: true,
},
quality: 'good',
docsInTimeRange: 100,
degradedDocs: {
count: 0,
percentage: 0,
},
},
{
name: 'synth',
type: 'logs',
namespace: 'default',
title: 'synth',
rawName: 'logs-synth-default',
lastActivity: 1712911241117,
size: '62.5kb',
sizeBytes: 64066,
integration: undefined,
totalDocs: 100,
userPrivileges: {
canMonitor: true,
},
quality: 'good',
docsInTimeRange: 100,
degradedDocs: {
count: 0,
percentage: 0,
},
},
]);
});
it('merges integration information with dataStreamStats when dataset is not an integration default one', () => {
const dataset = 'logs-system.custom-default';
const nonDefaultDataset = {
name: dataset,
name: 'logs-system.custom-default',
lastActivity: 1712911241117,
size: '82.1kb',
sizeBytes: 84160,
totalDocs: 100,
integration: 'system',
userPrivileges: {
canMonitor: true,
},
};
const datasets = generateDatasets([nonDefaultDataset], DEFAULT_DICTIONARY_TYPE, integrations);
const datasets = generateDatasets([nonDefaultDataset], [], integrations, totalDocs);
expect(datasets).toEqual([
{
...nonDefaultDataset,
title: indexNameToDataStreamParts(dataset).dataset,
name: indexNameToDataStreamParts(dataset).dataset,
namespace: indexNameToDataStreamParts(dataset).namespace,
type: indexNameToDataStreamParts(dataset).type,
rawName: nonDefaultDataset.name,
name: 'system.custom',
type: 'logs',
namespace: 'default',
title: 'system.custom',
rawName: 'logs-system.custom-default',
lastActivity: 1712911241117,
size: '82.1kb',
sizeBytes: 84160,
integration: integrations[0],
userPrivileges: {
canMonitor: true,
},
quality: 'good',
totalDocs: 100,
docsInTimeRange: 0,
degradedDocs: {
count: 0,
percentage: 0,
docsCount: 0,
quality: 'good',
},
},
]);

View file

@ -5,23 +5,20 @@
* 2.0.
*/
import { DEFAULT_DEGRADED_DOCS } from '../../common/constants';
import { DataStreamDocsStat } from '../../common/api_types';
import { DataStreamStatType } from '../../common/data_streams_stats/types';
import { mapPercentageToQuality } from '../../common/utils';
import { Integration } from '../../common/data_streams_stats/integration';
import { DataStreamStat } from '../../common/data_streams_stats/data_stream_stat';
import { DegradedDocsStat } from '../../common/data_streams_stats/malformed_docs_stat';
import { DictionaryType } from '../state_machines/dataset_quality_controller/src/types';
import { flattenStats } from './flatten_stats';
export function generateDatasets(
dataStreamStats: DataStreamStatType[] = [],
degradedDocStats: DictionaryType<DegradedDocsStat>,
integrations: Integration[]
degradedDocStats: DataStreamDocsStat[] = [],
integrations: Integration[],
totalDocsStats: DictionaryType<DataStreamDocsStat>
): DataStreamStat[] {
if (!dataStreamStats.length && !integrations.length) {
return [];
}
const {
datasetIntegrationMap,
integrationsMap,
@ -50,35 +47,42 @@ export function generateDatasets(
{ datasetIntegrationMap: {}, integrationsMap: {} }
);
const degradedDocs = flattenStats(degradedDocStats);
if (!dataStreamStats.length) {
return degradedDocs.map((degradedDocStat) =>
DataStreamStat.fromDegradedDocStat({ degradedDocStat, datasetIntegrationMap })
);
}
const totalDocs = flattenStats(totalDocsStats);
const totalDocsMap: Record<DataStreamDocsStat['dataset'], DataStreamDocsStat['count']> =
Object.fromEntries(totalDocs.map(({ dataset, count }) => [dataset, count]));
const degradedMap: Record<
DegradedDocsStat['dataset'],
DataStreamDocsStat['dataset'],
{
percentage: DegradedDocsStat['percentage'];
count: DegradedDocsStat['count'];
docsCount: DegradedDocsStat['docsCount'];
quality: DegradedDocsStat['quality'];
percentage: number;
count: DataStreamDocsStat['count'];
}
> = degradedDocs.reduce(
(degradedMapAcc, { dataset, percentage, count, docsCount }) =>
> = degradedDocStats.reduce(
(degradedMapAcc, { dataset, count }) =>
Object.assign(degradedMapAcc, {
[dataset]: {
percentage,
count,
docsCount,
quality: mapPercentageToQuality(percentage),
percentage: DataStreamStat.calculatePercentage({
totalDocs: totalDocsMap[dataset],
count,
}),
},
}),
{}
);
if (!dataStreamStats.length) {
// We want to pick up all datasets even when they don't have degraded docs
const dataStreams = [...new Set([...Object.keys(totalDocsMap), ...Object.keys(degradedMap)])];
return dataStreams.map((dataset) =>
DataStreamStat.fromDegradedDocStat({
degradedDocStat: { dataset, ...(degradedMap[dataset] || DEFAULT_DEGRADED_DOCS) },
datasetIntegrationMap,
totalDocs: totalDocsMap[dataset] ?? 0,
})
);
}
return dataStreamStats?.map((dataStream) => {
const dataset = DataStreamStat.create(dataStream);
@ -89,6 +93,10 @@ export function generateDatasets(
datasetIntegrationMap[dataset.name]?.integration ??
integrationsMap[dataStream.integration ?? ''],
degradedDocs: degradedMap[dataset.rawName] || dataset.degradedDocs,
docsInTimeRange: totalDocsMap[dataset.rawName] ?? 0,
quality: mapPercentageToQuality(
(degradedMap[dataset.rawName] || dataset.degradedDocs).percentage
),
};
});
}

View file

@ -0,0 +1,94 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient } from '@kbn/core/server';
import { rangeQuery } from '@kbn/observability-plugin/server';
import { QueryDslBoolQuery } from '@elastic/elasticsearch/lib/api/types';
import { DataStreamDocsStat } from '../../../common/api_types';
import { createDatasetQualityESClient } from '../../utils';
interface Dataset {
type: string;
dataset: string;
namespace: string;
}
const SIZE_LIMIT = 10000;
export async function getAggregatedDatasetPaginatedResults(options: {
esClient: ElasticsearchClient;
index: string;
start: number;
end: number;
query?: QueryDslBoolQuery;
after?: Dataset;
prevResults?: DataStreamDocsStat[];
}): Promise<DataStreamDocsStat[]> {
const { esClient, index, query, start, end, after, prevResults = [] } = options;
const datasetQualityESClient = createDatasetQualityESClient(esClient);
const aggs = (afterKey?: Dataset) => ({
datasets: {
composite: {
...(afterKey ? { after: afterKey } : {}),
size: SIZE_LIMIT,
sources: [
{ type: { terms: { field: 'data_stream.type' } } },
{ dataset: { terms: { field: 'data_stream.dataset' } } },
{ namespace: { terms: { field: 'data_stream.namespace' } } },
],
},
},
});
const bool = {
...query,
filter: [
...(query?.filter ? (Array.isArray(query.filter) ? query.filter : [query.filter]) : []),
...[...rangeQuery(start, end)],
],
};
const response = await datasetQualityESClient.search({
index,
size: 0,
query: {
bool,
},
aggs: aggs(after),
});
const currResults =
response.aggregations?.datasets.buckets.map((bucket) => ({
dataset: `${bucket.key.type}-${bucket.key.dataset}-${bucket.key.namespace}`,
count: bucket.doc_count,
})) ?? [];
const results = [...prevResults, ...currResults];
if (
response.aggregations?.datasets.after_key &&
response.aggregations?.datasets.buckets.length === SIZE_LIMIT
) {
return getAggregatedDatasetPaginatedResults({
esClient,
index,
start,
end,
after:
(response.aggregations?.datasets.after_key as {
type: string;
dataset: string;
namespace: string;
}) || after,
prevResults: results,
});
}
return results;
}

View file

@ -6,161 +6,37 @@
*/
import type { ElasticsearchClient } from '@kbn/core/server';
import { rangeQuery, termQuery } from '@kbn/observability-plugin/server';
import { DEFAULT_DATASET_TYPE } from '../../../common/constants';
import { streamPartsToIndexPattern } from '../../../common/utils';
import { DataStreamType } from '../../../common/types';
import { DegradedDocs } from '../../../common/api_types';
import {
DATA_STREAM_DATASET,
DATA_STREAM_NAMESPACE,
DATA_STREAM_TYPE,
_IGNORED,
} from '../../../common/es_fields';
import { createDatasetQualityESClient, wildcardQuery } from '../../utils';
interface ResultBucket {
dataset: string;
count: number;
}
const SIZE_LIMIT = 10000;
import { DataStreamDocsStat } from '../../../common/api_types';
import { _IGNORED } from '../../../common/es_fields';
import { getAggregatedDatasetPaginatedResults } from './get_dataset_aggregated_paginated_results';
export async function getDegradedDocsPaginated(options: {
esClient: ElasticsearchClient;
type?: DataStreamType;
types: DataStreamType[];
datasetQuery?: string;
start: number;
end: number;
datasetQuery?: string;
after?: {
degradedDocs?: { dataset: string; namespace: string };
docsCount?: { dataset: string; namespace: string };
};
prevResults?: { degradedDocs: ResultBucket[]; docsCount: ResultBucket[] };
}): Promise<DegradedDocs[]> {
const {
}): Promise<DataStreamDocsStat[]> {
const { esClient, types, datasetQuery, start, end } = options;
const datasetNames = datasetQuery
? [datasetQuery]
: types.map((type) =>
streamPartsToIndexPattern({
typePattern: type,
datasetPattern: '*-*',
})
);
return await getAggregatedDatasetPaginatedResults({
esClient,
type = DEFAULT_DATASET_TYPE,
datasetQuery,
start,
end,
after,
prevResults = { degradedDocs: [], docsCount: [] },
} = options;
const datasetQualityESClient = createDatasetQualityESClient(esClient);
const datasetFilter = {
...(datasetQuery
? {
should: [
...wildcardQuery(DATA_STREAM_DATASET, datasetQuery),
...wildcardQuery(DATA_STREAM_NAMESPACE, datasetQuery),
],
minimum_should_match: 1,
}
: {}),
};
const otherFilters = [...rangeQuery(start, end), ...termQuery(DATA_STREAM_TYPE, type)];
const aggs = (afterKey?: { dataset: string; namespace: string }) => ({
datasets: {
composite: {
...(afterKey ? { after: afterKey } : {}),
size: SIZE_LIMIT,
sources: [
{ dataset: { terms: { field: 'data_stream.dataset' } } },
{ namespace: { terms: { field: 'data_stream.namespace' } } },
],
},
index: datasetNames.join(','),
query: {
must: { exists: { field: _IGNORED } },
},
});
const response = await datasetQualityESClient.msearch({ index: `${type}-*-*` }, [
// degraded docs per dataset
{
size: 0,
query: {
bool: {
...datasetFilter,
filter: otherFilters,
must: { exists: { field: _IGNORED } },
},
},
aggs: aggs(after?.degradedDocs),
},
// total docs per dataset
{
size: 0,
query: {
bool: {
...datasetFilter,
filter: otherFilters,
},
},
aggs: aggs(after?.docsCount),
},
]);
const [degradedDocsResponse, totalDocsResponse] = response.responses;
const currDegradedDocs =
degradedDocsResponse.aggregations?.datasets.buckets.map((bucket) => ({
dataset: `${type}-${bucket.key.dataset}-${bucket.key.namespace}`,
count: bucket.doc_count,
})) ?? [];
const degradedDocs = [...prevResults.degradedDocs, ...currDegradedDocs];
const currTotalDocs =
totalDocsResponse.aggregations?.datasets.buckets.map((bucket) => ({
dataset: `${type}-${bucket.key.dataset}-${bucket.key.namespace}`,
count: bucket.doc_count,
})) ?? [];
const docsCount = [...prevResults.docsCount, ...currTotalDocs];
if (
totalDocsResponse.aggregations?.datasets.after_key &&
totalDocsResponse.aggregations?.datasets.buckets.length === SIZE_LIMIT
) {
return getDegradedDocsPaginated({
esClient,
type,
start,
end,
datasetQuery,
after: {
degradedDocs:
(degradedDocsResponse.aggregations?.datasets.after_key as {
dataset: string;
namespace: string;
}) || after?.degradedDocs,
docsCount:
(totalDocsResponse.aggregations?.datasets.after_key as {
dataset: string;
namespace: string;
}) || after?.docsCount,
},
prevResults: { degradedDocs, docsCount },
});
}
const degradedDocsMap = degradedDocs.reduce(
(acc, curr) => ({
...acc,
[curr.dataset]: curr.count,
}),
{}
);
return docsCount.map((curr) => {
const degradedDocsCount = degradedDocsMap[curr.dataset as keyof typeof degradedDocsMap] || 0;
return {
...curr,
docsCount: curr.count,
count: degradedDocsCount,
percentage: (degradedDocsCount / curr.count) * 100,
};
});
}

View file

@ -10,12 +10,12 @@ import {
DataStreamDetails,
DataStreamSettings,
DataStreamStat,
DegradedDocs,
NonAggregatableDatasets,
DegradedFieldResponse,
DatasetUserPrivileges,
DegradedFieldValues,
DegradedFieldAnalysis,
DataStreamDocsStat,
UpdateFieldLimitResponse,
DataStreamRolloverResponse,
} from '../../../common/api_types';
@ -31,6 +31,7 @@ import { getDegradedFields } from './get_degraded_fields';
import { getDegradedFieldValues } from './get_degraded_field_values';
import { analyzeDegradedField } from './get_degraded_field_analysis';
import { getDataStreamsMeteringStats } from './get_data_streams_metering_stats';
import { getAggregatedDatasetPaginatedResults } from './get_dataset_aggregated_paginated_results';
import { updateFieldLimit } from './update_field_limit';
import { createDatasetQualityESClient } from '../../utils';
@ -97,7 +98,7 @@ const degradedDocsRoute = createDatasetQualityServerRoute({
params: t.type({
query: t.intersection([
rangeRt,
typeRt,
t.type({ types: typesRt }),
t.partial({
datasetQuery: t.string,
}),
@ -107,19 +108,13 @@ const degradedDocsRoute = createDatasetQualityServerRoute({
tags: [],
},
async handler(resources): Promise<{
degradedDocs: DegradedDocs[];
degradedDocs: DataStreamDocsStat[];
}> {
const { context, params } = resources;
const coreContext = await context.core;
const esClient = coreContext.elasticsearch.client.asCurrentUser;
await datasetQualityPrivileges.throwIfCannotReadDataset(
esClient,
params.query.type,
params.query.datasetQuery
);
const degradedDocs = await getDegradedDocsPaginated({
esClient,
...params.query,
@ -131,6 +126,39 @@ const degradedDocsRoute = createDatasetQualityServerRoute({
},
});
const totalDocsRoute = createDatasetQualityServerRoute({
endpoint: 'GET /internal/dataset_quality/data_streams/total_docs',
params: t.type({
query: t.intersection([rangeRt, typeRt]),
}),
options: {
tags: [],
},
async handler(resources): Promise<{
totalDocs: DataStreamDocsStat[];
}> {
const { context, params } = resources;
const coreContext = await context.core;
const esClient = coreContext.elasticsearch.client.asCurrentUser;
await datasetQualityPrivileges.throwIfCannotReadDataset(esClient, params.query.type);
const { type, start, end } = params.query;
const totalDocs = await getAggregatedDatasetPaginatedResults({
esClient,
start,
end,
index: `${type}-*-*`,
});
return {
totalDocs,
};
},
});
const nonAggregatableDatasetsRoute = createDatasetQualityServerRoute({
endpoint: 'GET /internal/dataset_quality/data_streams/non_aggregatable',
params: t.type({
@ -383,6 +411,7 @@ const rolloverDataStream = createDatasetQualityServerRoute({
export const dataStreamsRouteRepository = {
...statsRoute,
...degradedDocsRoute,
...totalDocsRoute,
...nonAggregatableDatasetsRoute,
...nonAggregatableDatasetRoute,
...degradedFieldsRoute,

View file

@ -0,0 +1,129 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { log, timerange } from '@kbn/apm-synthtrace-client';
import expect from '@kbn/expect';
import { APIClientRequestParamsOf } from '@kbn/dataset-quality-plugin/common/rest';
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
import { RoleCredentials, SupertestWithRoleScopeType } from '../../../services';
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const samlAuth = getService('samlAuth');
const roleScopedSupertest = getService('roleScopedSupertest');
const synthtrace = getService('logsSynthtraceEsClient');
const from = '2024-09-20T11:00:00.000Z';
const to = '2024-09-20T11:01:00.000Z';
const dataStreamType = 'logs';
const dataset = 'synth';
const syntheticsDataset = 'synthetics';
const namespace = 'default';
const serviceName = 'my-service';
const hostName = 'synth-host';
const dataStreamName = `${dataStreamType}-${dataset}-${namespace}`;
const syntheticsDataStreamName = `${dataStreamType}-${syntheticsDataset}-${namespace}`;
const endpoint = 'GET /internal/dataset_quality/data_streams/total_docs';
type ApiParams = APIClientRequestParamsOf<typeof endpoint>['params']['query'];
async function callApiAs({
roleScopedSupertestWithCookieCredentials,
apiParams: { type, start, end },
}: {
roleScopedSupertestWithCookieCredentials: SupertestWithRoleScopeType;
apiParams: ApiParams;
}) {
return roleScopedSupertestWithCookieCredentials
.get(`/internal/dataset_quality/data_streams/total_docs`)
.query({
type,
start,
end,
});
}
describe('DataStream total docs', function () {
let adminRoleAuthc: RoleCredentials;
let supertestAdminWithCookieCredentials: SupertestWithRoleScopeType;
before(async () => {
adminRoleAuthc = await samlAuth.createM2mApiKeyWithRoleScope('admin');
supertestAdminWithCookieCredentials = await roleScopedSupertest.getSupertestWithRoleScope(
'admin',
{
useCookieHeader: true,
withInternalHeaders: true,
}
);
await synthtrace.index([
timerange(from, to)
.interval('1m')
.rate(1)
.generator((timestamp) => [
log
.create()
.message('This is a log message')
.timestamp(timestamp)
.dataset(dataset)
.namespace(namespace)
.defaults({
'log.file.path': '/my-service.log',
'service.name': serviceName,
'host.name': hostName,
}),
log
.create()
.message('This is a log message')
.timestamp(timestamp)
.dataset(syntheticsDataset)
.namespace(namespace)
.defaults({
'log.file.path': '/my-service.log',
'service.name': serviceName,
'host.name': hostName,
}),
]),
]);
});
after(async () => {
await synthtrace.clean();
await samlAuth.invalidateM2mApiKeyWithRoleScope(adminRoleAuthc);
});
it('returns number of documents per DataStream', async () => {
const resp = await callApiAs({
roleScopedSupertestWithCookieCredentials: supertestAdminWithCookieCredentials,
apiParams: {
type: dataStreamType,
start: from,
end: to,
},
});
expect(resp.body.totalDocs.length).to.be(2);
expect(resp.body.totalDocs[0].dataset).to.be(dataStreamName);
expect(resp.body.totalDocs[0].count).to.be(1);
expect(resp.body.totalDocs[1].dataset).to.be(syntheticsDataStreamName);
expect(resp.body.totalDocs[1].count).to.be(1);
});
it('returns empty when all documents are outside timeRange', async () => {
const resp = await callApiAs({
roleScopedSupertestWithCookieCredentials: supertestAdminWithCookieCredentials,
apiParams: {
type: dataStreamType,
start: '2024-09-21T11:00:00.000Z',
end: '2024-09-21T11:01:00.000Z',
},
});
expect(resp.body.totalDocs.length).to.be(0);
});
});
}

View file

@ -14,5 +14,6 @@ export default function ({ loadTestFile }: DeploymentAgnosticFtrProviderContext)
loadTestFile(require.resolve('./data_stream_settings'));
loadTestFile(require.resolve('./data_stream_rollover'));
loadTestFile(require.resolve('./update_field_limit'));
loadTestFile(require.resolve('./data_stream_total_docs'));
});
}

View file

@ -7,8 +7,7 @@
import { log, timerange } from '@kbn/apm-synthtrace-client';
import expect from '@kbn/expect';
import { DatasetQualityApiError } from '../../common/dataset_quality_api_supertest';
import { expectToReject } from '../../utils';
import rison from '@kbn/rison';
import { DatasetQualityApiClientKey } from '../../common/config';
import { FtrProviderContext } from '../../common/ftr_provider_context';
@ -24,7 +23,7 @@ export default function ApiTest({ getService }: FtrProviderContext) {
endpoint: 'GET /internal/dataset_quality/data_streams/degraded_docs',
params: {
query: {
type: 'logs',
types: rison.encodeArray(['logs']),
start,
end,
},
@ -33,13 +32,6 @@ export default function ApiTest({ getService }: FtrProviderContext) {
}
registry.when('Degraded docs', { config: 'basic' }, () => {
describe('authorization', () => {
it('should return a 403 when the user does not have sufficient privileges', async () => {
const err = await expectToReject<DatasetQualityApiError>(() => callApiAs('noAccessUser'));
expect(err.res.status).to.be(403);
});
});
describe('and there are log documents', () => {
before(async () => {
await synthtrace.index([
@ -75,25 +67,19 @@ export default function ApiTest({ getService }: FtrProviderContext) {
it('returns stats correctly', async () => {
const stats = await callApiAs('datasetQualityMonitorUser');
expect(stats.body.degradedDocs.length).to.be(2);
expect(stats.body.degradedDocs.length).to.be(1);
const degradedDocsStats = stats.body.degradedDocs.reduce(
(acc, curr) => ({
...acc,
[curr.dataset]: {
percentage: curr.percentage,
count: curr.count,
},
}),
{} as Record<string, { percentage: number; count: number }>
{} as Record<string, { count: number }>
);
expect(degradedDocsStats['logs-synth.1-default']).to.eql({
percentage: 0,
count: 0,
});
expect(degradedDocsStats['logs-synth.2-default']).to.eql({
percentage: 100,
count: 1,
});
});
@ -155,117 +141,45 @@ export default function ApiTest({ getService }: FtrProviderContext) {
it('returns counts and list of datasets correctly', async () => {
const stats = await callApiAs('datasetQualityMonitorUser');
expect(stats.body.degradedDocs.length).to.be(18);
expect(stats.body.degradedDocs.length).to.be(9);
const expected = {
degradedDocs: [
{
dataset: 'logs-apache.access-default',
count: 0,
docsCount: 1,
percentage: 0,
},
{
dataset: 'logs-apache.access-space1',
count: 0,
docsCount: 1,
percentage: 0,
},
{
dataset: 'logs-apache.access-space2',
count: 0,
docsCount: 1,
percentage: 0,
},
{
dataset: 'logs-apache.error-default',
count: 1,
docsCount: 2,
percentage: 50,
},
{
dataset: 'logs-apache.error-space1',
count: 1,
docsCount: 2,
percentage: 50,
},
{
dataset: 'logs-apache.error-space2',
count: 1,
docsCount: 2,
percentage: 50,
},
{
dataset: 'logs-mysql.access-default',
count: 0,
docsCount: 1,
percentage: 0,
},
{
dataset: 'logs-mysql.access-space1',
count: 0,
docsCount: 1,
percentage: 0,
},
{
dataset: 'logs-mysql.access-space2',
count: 0,
docsCount: 1,
percentage: 0,
},
{
dataset: 'logs-mysql.error-default',
count: 1,
docsCount: 2,
percentage: 50,
},
{
dataset: 'logs-mysql.error-space1',
count: 1,
docsCount: 2,
percentage: 50,
},
{
dataset: 'logs-mysql.error-space2',
count: 1,
docsCount: 2,
percentage: 50,
},
{
dataset: 'logs-nginx.access-default',
count: 0,
docsCount: 1,
percentage: 0,
},
{
dataset: 'logs-nginx.access-space1',
count: 0,
docsCount: 1,
percentage: 0,
},
{
dataset: 'logs-nginx.access-space2',
count: 0,
docsCount: 1,
percentage: 0,
},
{
dataset: 'logs-nginx.error-default',
count: 1,
docsCount: 2,
percentage: 50,
},
{
dataset: 'logs-nginx.error-space1',
count: 1,
docsCount: 2,
percentage: 50,
},
{
dataset: 'logs-nginx.error-space2',
count: 1,
docsCount: 2,
percentage: 50,
},
],
};

View file

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import { DatasetQualityApiClientKey } from '../../common/config';
import { FtrProviderContext } from '../../common/ftr_provider_context';
import { expectToReject } from '../../utils';
import { DatasetQualityApiError } from '../../common/dataset_quality_api_supertest';
export default function ApiTest({ getService }: FtrProviderContext) {
const registry = getService('registry');
const datasetQualityApiClient = getService('datasetQualityApiClient');
const start = '2023-12-11T18:00:00.000Z';
const end = '2023-12-11T18:01:00.000Z';
async function callApiAs(user: DatasetQualityApiClientKey) {
return await datasetQualityApiClient[user]({
endpoint: 'GET /internal/dataset_quality/data_streams/total_docs',
params: {
query: {
type: 'logs',
start,
end,
},
},
});
}
registry.when('Total docs', { config: 'basic' }, () => {
describe('authorization', () => {
it('should return a 403 when the user does not have sufficient privileges', async () => {
const err = await expectToReject<DatasetQualityApiError>(() => callApiAs('noAccessUser'));
expect(err.res.status).to.be(403);
});
});
});
}